mcp_runner/
lib.rs

1/*!
2 # MCP Runner
3
4 A Rust library for running and interacting with Model Context Protocol (MCP) servers.
5
6 ## Overview
7
8 MCP Runner provides functionality to:
9 - Start and manage MCP server processes
10 - Communicate with MCP servers using JSON-RPC
11 - List and call tools provided by MCP servers
12 - Access resources exposed by MCP servers
13 - Optionally proxy SSE (Server-Sent Events) to the servers for external clients
14
15 ## Basic Usage
16
17 ```no_run
18 use mcp_runner::{McpRunner, Result};
19 use serde_json::{json, Value};
20
21 #[tokio::main]
22 async fn main() -> Result<()> {
23     // Create a runner from config file
24     let mut runner = McpRunner::from_config_file("config.json")?;
25
26     // Start all configured servers
27     let server_ids = runner.start_all_servers().await?;
28
29     // Or start a specific server
30     let server_id = runner.start_server("fetch").await?;
31
32     // Get a client to interact with the server
33     let client = runner.get_client(server_id)?;
34
35     // Initialize the client
36     client.initialize().await?;
37
38     // List available tools
39     let tools = client.list_tools().await?;
40     println!("Available tools: {:?}", tools);
41
42     // Call a tool
43     let args = json!({
44         "url": "https://modelcontextprotocol.io"
45     });
46     let result: Value = client.call_tool("fetch", &args).await?;
47
48     println!("Result: {:?}", result);
49
50     Ok(())
51 }
52 ```
53
54 ## Features
55
56 - **Server Management**: Start, stop, and monitor MCP servers
57 - **JSON-RPC Communication**: Communicate with MCP servers using JSON-RPC
58 - **Configuration**: Configure servers through JSON config files
59 - **Error Handling**: Comprehensive error handling
60 - **Async Support**: Full async/await support
61 - **SSE Proxy**: Support for SSE proxying with authentication and CORS
62
63 ## License
64
65 This project is licensed under the terms in the LICENSE file.
66*/
67
68pub mod client;
69pub mod config;
70pub mod error;
71pub mod server;
72pub mod sse_proxy;
73pub mod transport;
74
75pub use client::McpClient;
76pub use config::Config;
77pub use error::{Error, Result};
78pub use server::{ServerId, ServerProcess, ServerStatus};
79pub use sse_proxy::SSEProxyHandle;
80
81use std::collections::HashMap;
82use std::path::Path;
83use std::sync::Arc;
84use transport::StdioTransport;
85
86use sse_proxy::types::ServerInfo;
87use sse_proxy::{SSEProxy, SSEProxyRunnerAccess};
88
89/// Configure and run MCP servers
90///
91/// This struct is the main entry point for managing MCP server lifecycles
92/// and obtaining clients to interact with them.
93/// All public methods are instrumented with `tracing` spans.
94pub struct McpRunner {
95    /// Configuration
96    config: Config,
97    /// Running server processes
98    servers: HashMap<ServerId, ServerProcess>,
99    /// Map of server names to server IDs
100    server_names: HashMap<String, ServerId>,
101    /// SSE proxy handle (if running)
102    sse_proxy_handle: Option<SSEProxyHandle>,
103    /// Cached clients for servers
104    clients: HashMap<ServerId, Option<McpClient>>,
105}
106
107impl McpRunner {
108    /// Create a new MCP runner from a configuration file path
109    ///
110    /// This method is instrumented with `tracing`.
111    #[tracing::instrument(skip(path), fields(config_path = ?path.as_ref()))]
112    pub fn from_config_file(path: impl AsRef<Path>) -> Result<Self> {
113        tracing::info!("Loading configuration from file");
114        let config = Config::from_file(path)?;
115        Ok(Self::new(config))
116    }
117
118    /// Create a new MCP runner from a configuration string
119    ///
120    /// This method is instrumented with `tracing`.
121    #[tracing::instrument(skip(config))]
122    pub fn from_config_str(config: &str) -> Result<Self> {
123        tracing::info!("Loading configuration from string");
124        let config = Config::parse_from_str(config)?;
125        Ok(Self::new(config))
126    }
127
128    /// Create a new MCP runner from a configuration
129    ///
130    /// This method is instrumented with `tracing`.
131    #[tracing::instrument(skip(config), fields(num_servers = config.mcp_servers.len()))]
132    pub fn new(config: Config) -> Self {
133        tracing::info!("Creating new McpRunner");
134        Self {
135            config,
136            servers: HashMap::new(),
137            server_names: HashMap::new(),
138            sse_proxy_handle: None,
139            clients: HashMap::new(),
140        }
141    }
142
143    /// Start a specific MCP server
144    ///
145    /// This method is instrumented with `tracing`.
146    #[tracing::instrument(skip(self), fields(server_name = %name))]
147    pub async fn start_server(&mut self, name: &str) -> Result<ServerId> {
148        // Check if server is already running
149        if let Some(id) = self.server_names.get(name) {
150            tracing::debug!(server_id = %id, "Server already running");
151            return Ok(*id);
152        }
153
154        tracing::info!("Attempting to start server");
155        // Get server configuration
156        let config = self
157            .config
158            .mcp_servers
159            .get(name)
160            .ok_or_else(|| {
161                tracing::error!("Configuration not found for server");
162                Error::ServerNotFound(name.to_string())
163            })?
164            .clone();
165
166        // Create and start server process
167        let mut server = ServerProcess::new(name.to_string(), config);
168        let id = server.id();
169        tracing::debug!(server_id = %id, "Created ServerProcess instance");
170
171        server.start().await.map_err(|e| {
172            tracing::error!(error = %e, "Failed to start server process");
173            e
174        })?;
175
176        // Store server
177        tracing::debug!(server_id = %id, "Storing running server process");
178        self.servers.insert(id, server);
179        self.server_names.insert(name.to_string(), id);
180
181        // Notify SSE proxy about the new server if it's running
182        if let Some(proxy) = &self.sse_proxy_handle {
183            let status = format!("{:?}", ServerStatus::Running);
184            if let Err(e) = proxy.update_server_info(name, Some(id), &status).await {
185                tracing::warn!(
186                    error = %e,
187                    server = %name,
188                    "Failed to update server info in SSE proxy"
189                );
190
191                // If the server wasn't in the proxy cache yet, try to add it
192                let server_info = ServerInfo {
193                    name: name.to_string(),
194                    id: format!("{:?}", id),
195                    status: status.clone(),
196                };
197
198                // Try to add the server information to the proxy
199                if let Err(e) = proxy.add_server_info(name, server_info.clone()).await {
200                    tracing::warn!(
201                        error = %e,
202                        server = %name,
203                        "Failed to add server to SSE proxy cache"
204                    );
205                } else {
206                    tracing::debug!(server = %name, "Added new server to SSE proxy cache");
207                }
208            } else {
209                tracing::debug!(server = %name, "Updated SSE proxy with new server information");
210            }
211        }
212
213        tracing::info!(server_id = %id, "Server started successfully");
214        Ok(id)
215    }
216
217    /// Start all configured servers
218    ///
219    /// This method is instrumented with `tracing`.
220    #[tracing::instrument(skip(self))]
221    pub async fn start_all_servers(&mut self) -> Result<Vec<ServerId>> {
222        tracing::info!("Starting all configured servers");
223        // Collect server names first to avoid borrowing issues
224        let server_names: Vec<String> = self
225            .config
226            .mcp_servers
227            .keys()
228            .map(|k| k.to_string())
229            .collect();
230        tracing::debug!(servers_to_start = ?server_names);
231
232        // Start servers sequentially but with improved error handling
233        let mut ids = Vec::new();
234        let mut errors = Vec::new();
235
236        for name in server_names {
237            match self.start_server(&name).await {
238                Ok(id) => ids.push(id),
239                Err(e) => {
240                    tracing::error!(server_name = %name, error = %e, "Failed to start server");
241                    errors.push((name, e));
242                }
243            }
244        }
245
246        if !errors.is_empty() {
247            tracing::warn!(
248                num_failed = errors.len(),
249                "Some servers failed to start: {:?}",
250                errors
251                    .iter()
252                    .map(|(name, _): &(String, Error)| name.as_str())
253                    .collect::<Vec<_>>()
254            );
255            // Create an aggregate error message including all failures
256            if errors.len() == 1 {
257                return Err(errors.remove(0).1);
258            } else {
259                let error_msg = errors
260                    .iter()
261                    .map(|(name, e)| format!("{}: {}", name, e))
262                    .collect::<Vec<_>>()
263                    .join("; ");
264                return Err(Error::Other(format!(
265                    "Multiple servers failed to start: {}",
266                    error_msg
267                )));
268            }
269        }
270
271        tracing::info!(num_started = ids.len(), "Finished starting all servers");
272        Ok(ids)
273    }
274
275    /// Start all configured servers and the SSE proxy if configured
276    ///
277    /// This is a convenience method that starts all configured MCP servers
278    /// and then starts the SSE proxy if it's configured. This ensures that
279    /// all servers are available before the proxy begins accepting connections.
280    ///
281    /// # Returns
282    ///
283    /// A tuple containing:
284    /// - A `Result<Vec<ServerId>>` with server IDs for all started servers, or an error
285    /// - A `bool` indicating whether the SSE proxy was started
286    ///
287    /// # Examples
288    ///
289    /// ```no_run
290    /// use mcp_runner::McpRunner;
291    ///
292    /// #[tokio::main]
293    /// async fn main() -> mcp_runner::Result<()> {
294    ///     // Create runner from config
295    ///     let mut runner = McpRunner::from_config_file("config.json")?;
296    ///
297    ///     // Start all servers and proxy if configured
298    ///     let (server_ids, proxy_started) = runner.start_all_with_proxy().await;
299    ///     
300    ///     // Check if servers started successfully
301    ///     let server_ids = server_ids?;
302    ///     println!("Started {} servers", server_ids.len());
303    ///     
304    ///     if proxy_started {
305    ///         println!("SSE proxy started successfully");
306    ///     }
307    ///     
308    ///     Ok(())
309    /// }
310    /// ```
311    ///
312    /// This method is instrumented with `tracing`.
313    #[tracing::instrument(skip(self))]
314    pub async fn start_all_with_proxy(&mut self) -> (Result<Vec<ServerId>>, bool) {
315        // First start all servers
316        let server_result = self.start_all_servers().await;
317
318        // Only attempt to start proxy if servers started successfully
319        let proxy_started = if server_result.is_ok() && self.is_sse_proxy_configured() {
320            match self.start_sse_proxy().await {
321                Ok(_) => {
322                    tracing::info!("SSE proxy started automatically");
323                    true
324                }
325                Err(e) => {
326                    tracing::warn!(error = %e, "Failed to start SSE proxy");
327                    false
328                }
329            }
330        } else {
331            if self.is_sse_proxy_configured() {
332                tracing::warn!("Not starting SSE proxy because servers failed to start");
333            }
334            false
335        };
336
337        (server_result, proxy_started)
338    }
339
340    /// Stop a running server
341    ///
342    /// This method is instrumented with `tracing`.
343    #[tracing::instrument(skip(self), fields(server_id = %id))]
344    pub async fn stop_server(&mut self, id: ServerId) -> Result<()> {
345        tracing::info!("Attempting to stop server");
346        if let Some(mut server) = self.servers.remove(&id) {
347            let name = server.name().to_string();
348            tracing::debug!(server_name = %name, "Found server process to stop");
349            self.server_names.remove(&name);
350
351            server.stop().await.map_err(|e| {
352                tracing::error!(error = %e, "Failed to stop server process");
353                e
354            })?;
355
356            // Notify SSE proxy about the server being stopped
357            if let Some(proxy) = &self.sse_proxy_handle {
358                if let Err(e) = proxy.update_server_info(&name, None, "Stopped").await {
359                    tracing::warn!(
360                        error = %e,
361                        server = %name,
362                        "Failed to update SSE proxy with server stopped status"
363                    );
364                } else {
365                    tracing::debug!(server = %name, "Updated SSE proxy with server stopped status");
366                }
367            }
368
369            tracing::info!("Server stopped successfully");
370            Ok(())
371        } else {
372            tracing::warn!("Attempted to stop a server that was not found or not running");
373            Err(Error::ServerNotFound(format!("{:?}", id)))
374        }
375    }
376
377    /// Stop all running servers and the SSE proxy if it's running
378    ///
379    /// This method stops all running servers and the SSE proxy (if running).
380    /// It collects all errors but only returns the first one encountered.
381    ///
382    /// # Returns
383    ///
384    /// A `Result<()>` indicating success or the first error encountered.
385    ///
386    /// # Examples
387    ///
388    /// ```no_run
389    /// use mcp_runner::McpRunner;
390    ///
391    /// #[tokio::main]
392    /// async fn main() -> mcp_runner::Result<()> {
393    ///     let mut runner = McpRunner::from_config_file("config.json")?;
394    ///     runner.start_all_with_proxy().await;
395    ///     
396    ///     // Later, stop everything
397    ///     runner.stop_all_servers().await?;
398    ///     println!("All servers and proxy stopped");
399    ///     
400    ///     Ok(())
401    /// }
402    /// ```
403    ///
404    /// This method is instrumented with `tracing`.
405    #[tracing::instrument(skip(self))]
406    pub async fn stop_all_servers(&mut self) -> Result<()> {
407        tracing::info!("Stopping all servers and proxy if running");
408
409        // Collect all server IDs first to avoid borrowing issues
410        let server_ids: Vec<ServerId> = self.servers.keys().copied().collect();
411
412        // Stop the SSE proxy first if it's running
413        if let Some(proxy_handle) = self.sse_proxy_handle.take() {
414            tracing::info!("Stopping SSE proxy");
415            if let Err(e) = proxy_handle.shutdown().await {
416                tracing::warn!(error = %e, "Error shutting down SSE proxy");
417                // We continue anyway since we're in the process of clean-up
418            }
419            tracing::info!("SSE proxy stopped");
420        }
421
422        // Stop servers sequentially but with improved error handling
423        let mut errors = Vec::new();
424
425        for id in server_ids {
426            match self.stop_server(id).await {
427                Ok(_) => {}
428                Err(e) => {
429                    tracing::error!(server_id = ?id, error = %e, "Failed to stop server");
430                    errors.push((id, e));
431                }
432            }
433        }
434
435        if errors.is_empty() {
436            tracing::info!("All servers stopped successfully");
437            Ok(())
438        } else {
439            tracing::warn!(error_count = errors.len(), "Some servers failed to stop");
440            // Create an aggregate error message including all failures
441            if errors.len() == 1 {
442                return Err(errors.remove(0).1);
443            } else {
444                let error_msg = errors
445                    .iter()
446                    .map(|(id, e)| format!("{:?}: {}", id, e))
447                    .collect::<Vec<_>>()
448                    .join("; ");
449                return Err(Error::Other(format!(
450                    "Multiple servers failed to stop: {}",
451                    error_msg
452                )));
453            }
454        }
455    }
456
457    /// Get server status
458    ///
459    /// This method is instrumented with `tracing`.
460    #[tracing::instrument(skip(self), fields(server_id = %id))]
461    pub fn server_status(&self, id: ServerId) -> Result<ServerStatus> {
462        tracing::debug!("Getting server status");
463        self.servers
464            .get(&id)
465            .map(|server| {
466                let status = server.status();
467                tracing::trace!(status = ?status);
468                status
469            })
470            .ok_or_else(|| {
471                tracing::warn!("Status requested for unknown server");
472                Error::ServerNotFound(format!("{:?}", id))
473            })
474    }
475
476    /// Get server ID by name
477    ///
478    /// This method is instrumented with `tracing`.
479    #[tracing::instrument(skip(self), fields(server_name = %name))]
480    pub fn get_server_id(&self, name: &str) -> Result<ServerId> {
481        tracing::debug!("Getting server ID by name");
482        self.server_names.get(name).copied().ok_or_else(|| {
483            tracing::warn!("Server ID requested for unknown server name");
484            Error::ServerNotFound(name.to_string())
485        })
486    }
487
488    /// Get a client for a server
489    ///
490    /// This method is instrumented with `tracing`.
491    ///
492    /// If the client already exists in cache, a `ClientAlreadyCached` error is returned.
493    /// In this case, you can retrieve the cached client using specialized methods like
494    /// `get_server_tools` that handle the cache internally.
495    #[tracing::instrument(skip(self), fields(server_id = %id))]
496    pub fn get_client(&mut self, id: ServerId) -> Result<McpClient> {
497        tracing::info!("Getting client for server");
498
499        // First check if we already have a client for this server
500        if let Some(Some(_client)) = self.clients.get(&id) {
501            tracing::debug!("Client already exists in cache");
502            // Return a specific error type for this case
503            return Err(Error::ClientAlreadyCached);
504        }
505
506        // Check if we've already tried to get a client but it failed
507        if let Some(None) = self.clients.get(&id) {
508            tracing::warn!("Previously failed to create client for this server");
509            return Err(Error::ServerNotFound(format!(
510                "{:?} (client creation previously failed)",
511                id
512            )));
513        }
514
515        // Create a new client
516        let server = self.servers.get_mut(&id).ok_or_else(|| {
517            tracing::error!("Client requested for unknown or stopped server");
518            Error::ServerNotFound(format!("{:?}", id))
519        })?;
520        let server_name = server.name().to_string();
521        tracing::debug!(server_name = %server_name, "Found server process");
522
523        tracing::debug!("Taking stdin/stdout from server process");
524        let stdin = match server.take_stdin() {
525            Ok(stdin) => stdin,
526            Err(e) => {
527                tracing::error!(error = %e, "Failed to take stdin from server");
528                // Mark this server as failed in our clients cache
529                self.clients.insert(id, None);
530                return Err(e);
531            }
532        };
533
534        let stdout = match server.take_stdout() {
535            Ok(stdout) => stdout,
536            Err(e) => {
537                tracing::error!(error = %e, "Failed to take stdout from server");
538                // Mark this server as failed in our clients cache
539                self.clients.insert(id, None);
540                return Err(e);
541            }
542        };
543
544        tracing::debug!("Creating StdioTransport and McpClient");
545        let transport = StdioTransport::new(server_name.clone(), stdin, stdout);
546        let client = McpClient::new(server_name, transport);
547
548        // Store the client in our cache
549        self.clients.insert(id, Some(client.clone()));
550
551        tracing::info!("Client created successfully");
552        Ok(client)
553    }
554
555    /// Start the SSE proxy server if enabled in configuration
556    ///
557    /// This method is instrumented with `tracing`.
558    #[tracing::instrument(skip(self))]
559    pub async fn start_sse_proxy(&mut self) -> Result<()> {
560        if let Some(proxy_config) = &self.config.sse_proxy {
561            tracing::info!("Initializing SSE proxy server");
562
563            // Create the runner access functions
564            let runner_access = SSEProxyRunnerAccess {
565                get_server_id: Arc::new({
566                    let self_clone = self.clone(); // Clone self to move into the closure
567                    move |name: &str| self_clone.get_server_id(name)
568                }),
569                get_client: Arc::new({
570                    let self_clone = self.clone(); // Clone self to move into the closure
571                    move |id: ServerId| {
572                        // We need a mutable reference to self, which we can't have in a closure
573                        // Create a new client using the same logic as get_client, but without caching
574                        let servers = &self_clone.servers;
575                        if let Some(server) = servers.get(&id) {
576                            // We can't actually take stdin/stdout from the server in this closure because we only
577                            // have a shared reference. Instead, we'll create a new client to talk to the existing server.
578                            // This is inefficient but necessary for the proxy's design.
579                            let server_name = server.name().to_string();
580                            match McpClient::connect(&server_name, &self_clone.config) {
581                                Ok(client) => Ok(client),
582                                Err(e) => {
583                                    tracing::error!(error = %e, server_id = ?id, "Failed to create client for SSE proxy");
584                                    Err(e)
585                                }
586                            }
587                        } else {
588                            Err(Error::ServerNotFound(format!("{:?}", id)))
589                        }
590                    }
591                }),
592                get_allowed_servers: Arc::new({
593                    let config = self.config.clone(); // Clone config to move into the closure
594                    move || {
595                        // Extract the allowed_servers from the sse_proxy config if present
596                        config
597                            .sse_proxy
598                            .as_ref()
599                            .and_then(|proxy_config| proxy_config.allowed_servers.clone())
600                    }
601                }),
602                get_server_config_keys: Arc::new({
603                    let config = self.config.clone(); // Clone config to move into the closure
604                    move || {
605                        // Return all server names from the config
606                        config.mcp_servers.keys().cloned().collect()
607                    }
608                }),
609            };
610
611            // Convert the config reference to an owned value
612            let proxy_config_owned = proxy_config.clone();
613
614            // Start the proxy with the runner access functions
615            let proxy_handle = SSEProxy::start_proxy(runner_access, proxy_config_owned).await?;
616
617            // Store the proxy handle
618            self.sse_proxy_handle = Some(proxy_handle);
619
620            tracing::info!(
621                "SSE proxy server started on {}:{}",
622                proxy_config.address,
623                proxy_config.port
624            );
625            Ok(())
626        } else {
627            tracing::warn!("SSE proxy not configured, skipping start");
628            Err(Error::Other(
629                "SSE proxy not configured in config".to_string(),
630            ))
631        }
632    }
633
634    /// Check if the SSE proxy is enabled in configuration
635    ///
636    /// This method is instrumented with `tracing`.
637    #[tracing::instrument(skip(self))]
638    pub fn is_sse_proxy_configured(&self) -> bool {
639        self.config.sse_proxy.is_some()
640    }
641
642    /// Get the SSE proxy configuration if it exists
643    ///
644    /// Retrieves a reference to the SSE proxy configuration from the runner's config.
645    /// This is useful for accessing proxy settings like address and port.
646    ///
647    /// # Returns
648    ///
649    /// A `Result` containing a reference to the `SSEProxyConfig` if configured,
650    /// or an `Error::Other` if no SSE proxy is configured.
651    ///
652    /// # Examples
653    ///
654    /// ```no_run
655    /// use mcp_runner::McpRunner;
656    ///
657    /// #[tokio::main]
658    /// async fn main() -> mcp_runner::Result<()> {
659    ///     let runner = McpRunner::from_config_file("config.json")?;
660    ///     
661    ///     if runner.is_sse_proxy_configured() {
662    ///         let proxy_config = runner.get_sse_proxy_config()?;
663    ///         println!("SSE proxy will listen on {}:{}", proxy_config.address, proxy_config.port);
664    ///     }
665    ///     
666    ///     Ok(())
667    /// }
668    /// ```
669    ///
670    /// This method is instrumented with `tracing`.
671    #[tracing::instrument(skip(self))]
672    pub fn get_sse_proxy_config(&self) -> Result<&config::SSEProxyConfig> {
673        tracing::debug!("Getting SSE proxy configuration");
674        self.config.sse_proxy.as_ref().ok_or_else(|| {
675            tracing::warn!("SSE proxy configuration requested but not configured");
676            Error::Other("SSE proxy not configured".to_string())
677        })
678    }
679
680    /// Get the running SSE proxy handle if it exists
681    ///
682    /// This method provides access to the running SSE proxy handle, which can be used
683    /// to communicate with the proxy or control it.
684    /// Note that this will only return a value if the proxy was previously started
685    /// with `start_sse_proxy()` or `start_all_with_proxy()`.
686    ///
687    /// # Returns
688    ///
689    /// A `Result` containing a reference to the running `SSEProxyHandle` instance,
690    /// or an `Error::Other` if no SSE proxy is running.
691    ///
692    /// # Examples
693    ///
694    /// ```no_run
695    /// use mcp_runner::McpRunner;
696    ///
697    /// #[tokio::main]
698    /// async fn main() -> mcp_runner::Result<()> {
699    ///     let mut runner = McpRunner::from_config_file("config.json")?;
700    ///     
701    ///     // Start servers and proxy
702    ///     let (server_ids, proxy_started) = runner.start_all_with_proxy().await;
703    ///     let _server_ids = server_ids?;
704    ///     
705    ///     if proxy_started {
706    ///         // Access the running proxy handle
707    ///         let proxy_handle = runner.get_sse_proxy_handle()?;
708    ///         let config = proxy_handle.config();
709    ///         println!("SSE proxy is running on {}:{}", config.address, config.port);
710    ///     }
711    ///     
712    ///     Ok(())
713    /// }
714    /// ```
715    ///
716    /// This method is instrumented with `tracing`.
717    #[tracing::instrument(skip(self))]
718    pub fn get_sse_proxy_handle(&self) -> Result<&SSEProxyHandle> {
719        tracing::debug!("Getting SSE proxy handle");
720        self.sse_proxy_handle.as_ref().ok_or_else(|| {
721            tracing::warn!("SSE proxy handle requested but no proxy is running");
722            Error::Other("SSE proxy not running".to_string())
723        })
724    }
725
726    /// Get status for all running servers
727    ///
728    /// This method returns a HashMap of server names to their current status.
729    /// This is a convenience method that can be called at any time to check on all servers.
730    ///
731    /// # Returns
732    ///
733    /// A `HashMap<String, ServerStatus>` containing the status of all currently running servers.
734    ///
735    /// # Examples
736    ///
737    /// ```no_run
738    /// use mcp_runner::McpRunner;
739    ///
740    /// #[tokio::main]
741    /// async fn main() -> mcp_runner::Result<()> {
742    ///     let mut runner = McpRunner::from_config_file("config.json")?;
743    ///     runner.start_all_servers().await?;
744    ///     
745    ///     // Check status of all servers
746    ///     let statuses = runner.get_all_server_statuses();
747    ///     for (name, status) in statuses {
748    ///         println!("Server '{}' status: {:?}", name, status);
749    ///     }
750    ///     
751    ///     Ok(())
752    /// }
753    /// ```
754    ///
755    /// This method is instrumented with `tracing`.
756    #[tracing::instrument(skip(self))]
757    pub fn get_all_server_statuses(&self) -> HashMap<String, ServerStatus> {
758        tracing::debug!("Getting status for all running servers");
759        let mut statuses = HashMap::new();
760
761        for (server_name, server_id) in &self.server_names {
762            if let Some(server) = self.servers.get(server_id) {
763                let status = server.status();
764                statuses.insert(server_name.clone(), status);
765                tracing::trace!(server = %server_name, status = ?status);
766            }
767        }
768
769        tracing::debug!(num_servers = statuses.len(), "Collected server statuses");
770        statuses
771    }
772
773    /// Get a list of available tools for a specific server
774    ///
775    /// This is a convenience method that creates a temporary client to query tools from a server.
776    /// Unlike `get_client().list_tools()`, this method handles all the client creation and cleanup internally.
777    ///
778    /// # Returns
779    ///
780    /// A `Result` containing a vector of tools provided by the specified server.
781    ///
782    /// # Examples
783    ///
784    /// ```no_run
785    /// use mcp_runner::McpRunner;
786    ///
787    /// #[tokio::main]
788    /// async fn main() -> mcp_runner::Result<()> {
789    ///     let mut runner = McpRunner::from_config_file("config.json")?;
790    ///     runner.start_server("fetch").await?;
791    ///     
792    ///     // Get tools for a specific server
793    ///     let tools = runner.get_server_tools("fetch").await?;
794    ///     for tool in tools {
795    ///         println!("Tool: {} - {}", tool.name, tool.description);
796    ///     }
797    ///     
798    ///     Ok(())
799    /// }
800    /// ```
801    ///
802    /// This method is instrumented with `tracing`.
803    #[tracing::instrument(skip(self), fields(server_name = %name))]
804    pub async fn get_server_tools(&mut self, name: &str) -> Result<Vec<client::Tool>> {
805        tracing::info!("Getting tools for server '{}'", name);
806
807        // Get server ID
808        let server_id = self.get_server_id(name)?;
809
810        // Check if we already have a client for this server
811        let client_from_cache = if let Some(Some(_client)) = self.clients.get(&server_id) {
812            tracing::debug!("Using cached client");
813            // Return a specific error type for this case
814            true
815        } else {
816            false
817        };
818
819        // Get or create client
820        let result: Result<Vec<client::Tool>> = if client_from_cache {
821            // Use cached client
822            let client = self.clients.get(&server_id).unwrap().as_ref().unwrap();
823
824            // Initialize the client
825            client.initialize().await.map_err(|e| {
826                tracing::error!(error = %e, "Failed to initialize client");
827                e
828            })?;
829
830            // List tools
831            client.list_tools().await.map_err(|e| {
832                tracing::error!(error = %e, "Failed to list tools for server");
833                e
834            })
835        } else {
836            // Create a new client
837            match self.get_client(server_id) {
838                Ok(client) => {
839                    // Initialize the client
840                    client.initialize().await.map_err(|e| {
841                        tracing::error!(error = %e, "Failed to initialize client");
842                        e
843                    })?;
844
845                    // List tools
846                    client.list_tools().await.map_err(|e| {
847                        tracing::error!(error = %e, "Failed to list tools for server");
848                        e
849                    })
850                }
851                Err(e) => {
852                    tracing::error!(error = %e, "Failed to get client");
853                    Err(e)
854                }
855            }
856        };
857
858        match &result {
859            Ok(tools) => {
860                let tools_len = tools.len();
861                tracing::info!(server = %name, num_tools = tools_len, "Successfully retrieved tools");
862            }
863            Err(e) => {
864                tracing::error!(server = %name, error = %e, "Failed to get tools");
865            }
866        }
867
868        result
869    }
870
871    /// Get tools for all running servers
872    ///
873    /// This method returns a HashMap of server names to their available tools.
874    /// This is a convenience method that can be called at any time to check tools for all running servers.
875    ///
876    /// # Returns
877    ///
878    /// A `HashMap<String, Result<Vec<Tool>>>` containing the tools of all currently running servers.
879    /// The Result indicates whether listing tools was successful for each server.
880    ///
881    /// # Examples
882    ///
883    /// ```no_run
884    /// use mcp_runner::McpRunner;
885    ///
886    /// #[tokio::main]
887    /// async fn main() -> mcp_runner::Result<()> {
888    ///     let mut runner = McpRunner::from_config_file("config.json")?;
889    ///     runner.start_all_servers().await?;
890    ///     
891    ///     // Get tools for all servers
892    ///     let all_tools = runner.get_all_server_tools().await;
893    ///     for (server_name, tools_result) in all_tools {
894    ///         match tools_result {
895    ///             Ok(tools) => {
896    ///                 println!("Server '{}' tools:", server_name);
897    ///                 for tool in tools {
898    ///                     println!(" - {}: {}", tool.name, tool.description);
899    ///                 }
900    ///             },
901    ///             Err(e) => println!("Failed to get tools for '{}': {}", server_name, e),
902    ///         }
903    ///     }
904    ///     
905    ///     Ok(())
906    /// }
907    /// ```
908    ///
909    /// This method is instrumented with `tracing`.
910    #[tracing::instrument(skip(self))]
911    pub async fn get_all_server_tools(&mut self) -> HashMap<String, Result<Vec<client::Tool>>> {
912        tracing::debug!("Getting tools for all running servers");
913        let mut all_tools = HashMap::new();
914
915        // Need to collect keys to avoid borrowing issues
916        let server_names: Vec<String> = self.server_names.keys().cloned().collect();
917
918        // Process each server sequentially with timeout protection
919        for name in server_names {
920            tracing::debug!(server = %name, "Getting tools");
921            // For each server, get its tools with a timeout to prevent hanging
922            let result = tokio::time::timeout(
923                std::time::Duration::from_secs(15),
924                self.get_server_tools(&name),
925            )
926            .await;
927
928            // Process the result, handling timeout case separately
929            let final_result = match result {
930                Ok(inner_result) => inner_result,
931                Err(_) => {
932                    tracing::warn!(server = %name, "Timed out getting tools");
933                    Err(Error::Timeout(format!(
934                        "Tool listing for server '{}' timed out",
935                        name
936                    )))
937                }
938            };
939
940            // Store the result (success or error) in the map
941            all_tools.insert(name, final_result);
942        }
943
944        tracing::debug!(
945            num_servers = all_tools.len(),
946            "Collected tools for all servers"
947        );
948        all_tools
949    }
950
951    /// Create a snapshot of current server information
952    ///
953    /// This creates a HashMap of server names to their ServerInfo which can be used
954    /// by the SSE proxy to report accurate server status information.
955    ///
956    /// This method is instrumented with `tracing`.
957    #[tracing::instrument(skip(self))]
958    fn get_server_info_snapshot(&self) -> HashMap<String, ServerInfo> {
959        tracing::debug!("Creating server information snapshot for SSE proxy");
960        let mut server_info = HashMap::new();
961
962        for (name, id) in &self.server_names {
963            if let Some(server) = self.servers.get(id) {
964                let status = server.status();
965                server_info.insert(
966                    name.clone(),
967                    ServerInfo {
968                        name: name.clone(),
969                        id: format!("{:?}", id),
970                        status: format!("{:?}", status),
971                    },
972                );
973                tracing::trace!(server = %name, status = ?status, "Added server to snapshot");
974            }
975        }
976
977        server_info
978    }
979}
980
981impl Clone for McpRunner {
982    fn clone(&self) -> Self {
983        Self {
984            config: self.config.clone(),
985            servers: self.servers.clone(),
986            server_names: self.server_names.clone(),
987            sse_proxy_handle: self.sse_proxy_handle.clone(),
988            clients: HashMap::new(), // We don't clone clients as they can't be cleanly cloned
989        }
990    }
991}