mcp_runner/lib.rs
1/*!
2 # MCP Runner
3
4 A Rust library for running and interacting with Model Context Protocol (MCP) servers.
5
6 ## Overview
7
8 MCP Runner provides functionality to:
9 - Start and manage MCP server processes
10 - Communicate with MCP servers using JSON-RPC
11 - List and call tools provided by MCP servers
12 - Access resources exposed by MCP servers
13 - Optionally proxy SSE (Server-Sent Events) to the servers for external clients
14
15 ## Basic Usage
16
17 ```no_run
18 use mcp_runner::{McpRunner, Result};
19 use serde_json::{json, Value};
20
21 #[tokio::main]
22 async fn main() -> Result<()> {
23 // Create a runner from config file
24 let mut runner = McpRunner::from_config_file("config.json")?;
25
26 // Start all configured servers
27 let server_ids = runner.start_all_servers().await?;
28
29 // Or start a specific server
30 let server_id = runner.start_server("fetch").await?;
31
32 // Get a client to interact with the server
33 let client = runner.get_client(server_id)?;
34
35 // Initialize the client
36 client.initialize().await?;
37
38 // List available tools
39 let tools = client.list_tools().await?;
40 println!("Available tools: {:?}", tools);
41
42 // Call a tool
43 let args = json!({
44 "url": "https://modelcontextprotocol.io"
45 });
46 let result: Value = client.call_tool("fetch", &args).await?;
47
48 println!("Result: {:?}", result);
49
50 Ok(())
51 }
52 ```
53
54 ## Features
55
56 - **Server Management**: Start, stop, and monitor MCP servers
57 - **JSON-RPC Communication**: Communicate with MCP servers using JSON-RPC
58 - **Configuration**: Configure servers through JSON config files
59 - **Error Handling**: Comprehensive error handling
60 - **Async Support**: Full async/await support
61 - **SSE Proxy**: Support for SSE proxying with authentication and CORS
62
63 ## License
64
65 This project is licensed under the terms in the LICENSE file.
66*/
67
68pub mod client;
69pub mod config;
70pub mod error;
71pub mod server;
72pub mod sse_proxy;
73pub mod transport;
74
75pub use client::McpClient;
76pub use config::Config;
77pub use error::{Error, Result};
78pub use server::{ServerId, ServerProcess, ServerStatus};
79pub use sse_proxy::SSEProxyHandle;
80
81use std::collections::HashMap;
82use std::path::Path;
83use std::sync::Arc;
84use transport::StdioTransport;
85
86use sse_proxy::types::ServerInfo;
87use sse_proxy::{SSEProxy, SSEProxyRunnerAccess};
88
89/// Configure and run MCP servers
90///
91/// This struct is the main entry point for managing MCP server lifecycles
92/// and obtaining clients to interact with them.
93/// All public methods are instrumented with `tracing` spans.
94pub struct McpRunner {
95 /// Configuration
96 config: Config,
97 /// Running server processes
98 servers: HashMap<ServerId, ServerProcess>,
99 /// Map of server names to server IDs
100 server_names: HashMap<String, ServerId>,
101 /// SSE proxy handle (if running)
102 sse_proxy_handle: Option<SSEProxyHandle>,
103 /// Cached clients for servers
104 clients: HashMap<ServerId, Option<McpClient>>,
105}
106
107impl McpRunner {
108 /// Create a new MCP runner from a configuration file path
109 ///
110 /// This method is instrumented with `tracing`.
111 #[tracing::instrument(skip(path), fields(config_path = ?path.as_ref()))]
112 pub fn from_config_file(path: impl AsRef<Path>) -> Result<Self> {
113 tracing::info!("Loading configuration from file");
114 let config = Config::from_file(path)?;
115 Ok(Self::new(config))
116 }
117
118 /// Create a new MCP runner from a configuration string
119 ///
120 /// This method is instrumented with `tracing`.
121 #[tracing::instrument(skip(config))]
122 pub fn from_config_str(config: &str) -> Result<Self> {
123 tracing::info!("Loading configuration from string");
124 let config = Config::parse_from_str(config)?;
125 Ok(Self::new(config))
126 }
127
128 /// Create a new MCP runner from a configuration
129 ///
130 /// This method is instrumented with `tracing`.
131 #[tracing::instrument(skip(config), fields(num_servers = config.mcp_servers.len()))]
132 pub fn new(config: Config) -> Self {
133 tracing::info!("Creating new McpRunner");
134 Self {
135 config,
136 servers: HashMap::new(),
137 server_names: HashMap::new(),
138 sse_proxy_handle: None,
139 clients: HashMap::new(),
140 }
141 }
142
143 /// Start a specific MCP server
144 ///
145 /// This method is instrumented with `tracing`.
146 #[tracing::instrument(skip(self), fields(server_name = %name))]
147 pub async fn start_server(&mut self, name: &str) -> Result<ServerId> {
148 // Check if server is already running
149 if let Some(id) = self.server_names.get(name) {
150 tracing::debug!(server_id = %id, "Server already running");
151 return Ok(*id);
152 }
153
154 tracing::info!("Attempting to start server");
155 // Get server configuration
156 let config = self
157 .config
158 .mcp_servers
159 .get(name)
160 .ok_or_else(|| {
161 tracing::error!("Configuration not found for server");
162 Error::ServerNotFound(name.to_string())
163 })?
164 .clone();
165
166 // Create and start server process
167 let mut server = ServerProcess::new(name.to_string(), config);
168 let id = server.id();
169 tracing::debug!(server_id = %id, "Created ServerProcess instance");
170
171 server.start().await.map_err(|e| {
172 tracing::error!(error = %e, "Failed to start server process");
173 e
174 })?;
175
176 // Store server
177 tracing::debug!(server_id = %id, "Storing running server process");
178 self.servers.insert(id, server);
179 self.server_names.insert(name.to_string(), id);
180
181 // Notify SSE proxy about the new server if it's running
182 if let Some(proxy) = &self.sse_proxy_handle {
183 let status = format!("{:?}", ServerStatus::Running);
184 if let Err(e) = proxy.update_server_info(name, Some(id), &status).await {
185 tracing::warn!(
186 error = %e,
187 server = %name,
188 "Failed to update server info in SSE proxy"
189 );
190
191 // If the server wasn't in the proxy cache yet, try to add it
192 let server_info = ServerInfo {
193 name: name.to_string(),
194 id: format!("{:?}", id),
195 status: status.clone(),
196 };
197
198 // Try to add the server information to the proxy
199 if let Err(e) = proxy.add_server_info(name, server_info.clone()).await {
200 tracing::warn!(
201 error = %e,
202 server = %name,
203 "Failed to add server to SSE proxy cache"
204 );
205 } else {
206 tracing::debug!(server = %name, "Added new server to SSE proxy cache");
207 }
208 } else {
209 tracing::debug!(server = %name, "Updated SSE proxy with new server information");
210 }
211 }
212
213 tracing::info!(server_id = %id, "Server started successfully");
214 Ok(id)
215 }
216
217 /// Start all configured servers
218 ///
219 /// This method is instrumented with `tracing`.
220 #[tracing::instrument(skip(self))]
221 pub async fn start_all_servers(&mut self) -> Result<Vec<ServerId>> {
222 tracing::info!("Starting all configured servers");
223 // Collect server names first to avoid borrowing issues
224 let server_names: Vec<String> = self
225 .config
226 .mcp_servers
227 .keys()
228 .map(|k| k.to_string())
229 .collect();
230 tracing::debug!(servers_to_start = ?server_names);
231
232 // Start servers sequentially but with improved error handling
233 let mut ids = Vec::new();
234 let mut errors = Vec::new();
235
236 for name in server_names {
237 match self.start_server(&name).await {
238 Ok(id) => ids.push(id),
239 Err(e) => {
240 tracing::error!(server_name = %name, error = %e, "Failed to start server");
241 errors.push((name, e));
242 }
243 }
244 }
245
246 if !errors.is_empty() {
247 tracing::warn!(
248 num_failed = errors.len(),
249 "Some servers failed to start: {:?}",
250 errors
251 .iter()
252 .map(|(name, _): &(String, Error)| name.as_str())
253 .collect::<Vec<_>>()
254 );
255 // Create an aggregate error message including all failures
256 if errors.len() == 1 {
257 return Err(errors.remove(0).1);
258 } else {
259 let error_msg = errors
260 .iter()
261 .map(|(name, e)| format!("{}: {}", name, e))
262 .collect::<Vec<_>>()
263 .join("; ");
264 return Err(Error::Other(format!(
265 "Multiple servers failed to start: {}",
266 error_msg
267 )));
268 }
269 }
270
271 tracing::info!(num_started = ids.len(), "Finished starting all servers");
272 Ok(ids)
273 }
274
275 /// Start all configured servers and the SSE proxy if configured
276 ///
277 /// This is a convenience method that starts all configured MCP servers
278 /// and then starts the SSE proxy if it's configured. This ensures that
279 /// all servers are available before the proxy begins accepting connections.
280 ///
281 /// # Returns
282 ///
283 /// A tuple containing:
284 /// - A `Result<Vec<ServerId>>` with server IDs for all started servers, or an error
285 /// - A `bool` indicating whether the SSE proxy was started
286 ///
287 /// # Examples
288 ///
289 /// ```no_run
290 /// use mcp_runner::McpRunner;
291 ///
292 /// #[tokio::main]
293 /// async fn main() -> mcp_runner::Result<()> {
294 /// // Create runner from config
295 /// let mut runner = McpRunner::from_config_file("config.json")?;
296 ///
297 /// // Start all servers and proxy if configured
298 /// let (server_ids, proxy_started) = runner.start_all_with_proxy().await;
299 ///
300 /// // Check if servers started successfully
301 /// let server_ids = server_ids?;
302 /// println!("Started {} servers", server_ids.len());
303 ///
304 /// if proxy_started {
305 /// println!("SSE proxy started successfully");
306 /// }
307 ///
308 /// Ok(())
309 /// }
310 /// ```
311 ///
312 /// This method is instrumented with `tracing`.
313 #[tracing::instrument(skip(self))]
314 pub async fn start_all_with_proxy(&mut self) -> (Result<Vec<ServerId>>, bool) {
315 // First start all servers
316 let server_result = self.start_all_servers().await;
317
318 // Only attempt to start proxy if servers started successfully
319 let proxy_started = if server_result.is_ok() && self.is_sse_proxy_configured() {
320 match self.start_sse_proxy().await {
321 Ok(_) => {
322 tracing::info!("SSE proxy started automatically");
323 true
324 }
325 Err(e) => {
326 tracing::warn!(error = %e, "Failed to start SSE proxy");
327 false
328 }
329 }
330 } else {
331 if self.is_sse_proxy_configured() {
332 tracing::warn!("Not starting SSE proxy because servers failed to start");
333 }
334 false
335 };
336
337 (server_result, proxy_started)
338 }
339
340 /// Stop a running server
341 ///
342 /// This method is instrumented with `tracing`.
343 #[tracing::instrument(skip(self), fields(server_id = %id))]
344 pub async fn stop_server(&mut self, id: ServerId) -> Result<()> {
345 tracing::info!("Attempting to stop server");
346 if let Some(mut server) = self.servers.remove(&id) {
347 let name = server.name().to_string();
348 tracing::debug!(server_name = %name, "Found server process to stop");
349 self.server_names.remove(&name);
350
351 server.stop().await.map_err(|e| {
352 tracing::error!(error = %e, "Failed to stop server process");
353 e
354 })?;
355
356 // Notify SSE proxy about the server being stopped
357 if let Some(proxy) = &self.sse_proxy_handle {
358 if let Err(e) = proxy.update_server_info(&name, None, "Stopped").await {
359 tracing::warn!(
360 error = %e,
361 server = %name,
362 "Failed to update SSE proxy with server stopped status"
363 );
364 } else {
365 tracing::debug!(server = %name, "Updated SSE proxy with server stopped status");
366 }
367 }
368
369 tracing::info!("Server stopped successfully");
370 Ok(())
371 } else {
372 tracing::warn!("Attempted to stop a server that was not found or not running");
373 Err(Error::ServerNotFound(format!("{:?}", id)))
374 }
375 }
376
377 /// Stop all running servers and the SSE proxy if it's running
378 ///
379 /// This method stops all running servers and the SSE proxy (if running).
380 /// It collects all errors but only returns the first one encountered.
381 ///
382 /// # Returns
383 ///
384 /// A `Result<()>` indicating success or the first error encountered.
385 ///
386 /// # Examples
387 ///
388 /// ```no_run
389 /// use mcp_runner::McpRunner;
390 ///
391 /// #[tokio::main]
392 /// async fn main() -> mcp_runner::Result<()> {
393 /// let mut runner = McpRunner::from_config_file("config.json")?;
394 /// runner.start_all_with_proxy().await;
395 ///
396 /// // Later, stop everything
397 /// runner.stop_all_servers().await?;
398 /// println!("All servers and proxy stopped");
399 ///
400 /// Ok(())
401 /// }
402 /// ```
403 ///
404 /// This method is instrumented with `tracing`.
405 #[tracing::instrument(skip(self))]
406 pub async fn stop_all_servers(&mut self) -> Result<()> {
407 tracing::info!("Stopping all servers and proxy if running");
408
409 // Collect all server IDs first to avoid borrowing issues
410 let server_ids: Vec<ServerId> = self.servers.keys().copied().collect();
411
412 // Stop the SSE proxy first if it's running
413 if let Some(proxy_handle) = self.sse_proxy_handle.take() {
414 tracing::info!("Stopping SSE proxy");
415 if let Err(e) = proxy_handle.shutdown().await {
416 tracing::warn!(error = %e, "Error shutting down SSE proxy");
417 // We continue anyway since we're in the process of clean-up
418 }
419 tracing::info!("SSE proxy stopped");
420 }
421
422 // Stop servers sequentially but with improved error handling
423 let mut errors = Vec::new();
424
425 for id in server_ids {
426 match self.stop_server(id).await {
427 Ok(_) => {}
428 Err(e) => {
429 tracing::error!(server_id = ?id, error = %e, "Failed to stop server");
430 errors.push((id, e));
431 }
432 }
433 }
434
435 if errors.is_empty() {
436 tracing::info!("All servers stopped successfully");
437 Ok(())
438 } else {
439 tracing::warn!(error_count = errors.len(), "Some servers failed to stop");
440 // Create an aggregate error message including all failures
441 if errors.len() == 1 {
442 return Err(errors.remove(0).1);
443 } else {
444 let error_msg = errors
445 .iter()
446 .map(|(id, e)| format!("{:?}: {}", id, e))
447 .collect::<Vec<_>>()
448 .join("; ");
449 return Err(Error::Other(format!(
450 "Multiple servers failed to stop: {}",
451 error_msg
452 )));
453 }
454 }
455 }
456
457 /// Get server status
458 ///
459 /// This method is instrumented with `tracing`.
460 #[tracing::instrument(skip(self), fields(server_id = %id))]
461 pub fn server_status(&self, id: ServerId) -> Result<ServerStatus> {
462 tracing::debug!("Getting server status");
463 self.servers
464 .get(&id)
465 .map(|server| {
466 let status = server.status();
467 tracing::trace!(status = ?status);
468 status
469 })
470 .ok_or_else(|| {
471 tracing::warn!("Status requested for unknown server");
472 Error::ServerNotFound(format!("{:?}", id))
473 })
474 }
475
476 /// Get server ID by name
477 ///
478 /// This method is instrumented with `tracing`.
479 #[tracing::instrument(skip(self), fields(server_name = %name))]
480 pub fn get_server_id(&self, name: &str) -> Result<ServerId> {
481 tracing::debug!("Getting server ID by name");
482 self.server_names.get(name).copied().ok_or_else(|| {
483 tracing::warn!("Server ID requested for unknown server name");
484 Error::ServerNotFound(name.to_string())
485 })
486 }
487
488 /// Get a client for a server
489 ///
490 /// This method is instrumented with `tracing`.
491 ///
492 /// If the client already exists in cache, a `ClientAlreadyCached` error is returned.
493 /// In this case, you can retrieve the cached client using specialized methods like
494 /// `get_server_tools` that handle the cache internally.
495 #[tracing::instrument(skip(self), fields(server_id = %id))]
496 pub fn get_client(&mut self, id: ServerId) -> Result<McpClient> {
497 tracing::info!("Getting client for server");
498
499 // First check if we already have a client for this server
500 if let Some(Some(_client)) = self.clients.get(&id) {
501 tracing::debug!("Client already exists in cache");
502 // Return a specific error type for this case
503 return Err(Error::ClientAlreadyCached);
504 }
505
506 // Check if we've already tried to get a client but it failed
507 if let Some(None) = self.clients.get(&id) {
508 tracing::warn!("Previously failed to create client for this server");
509 return Err(Error::ServerNotFound(format!(
510 "{:?} (client creation previously failed)",
511 id
512 )));
513 }
514
515 // Create a new client
516 let server = self.servers.get_mut(&id).ok_or_else(|| {
517 tracing::error!("Client requested for unknown or stopped server");
518 Error::ServerNotFound(format!("{:?}", id))
519 })?;
520 let server_name = server.name().to_string();
521 tracing::debug!(server_name = %server_name, "Found server process");
522
523 tracing::debug!("Taking stdin/stdout from server process");
524 let stdin = match server.take_stdin() {
525 Ok(stdin) => stdin,
526 Err(e) => {
527 tracing::error!(error = %e, "Failed to take stdin from server");
528 // Mark this server as failed in our clients cache
529 self.clients.insert(id, None);
530 return Err(e);
531 }
532 };
533
534 let stdout = match server.take_stdout() {
535 Ok(stdout) => stdout,
536 Err(e) => {
537 tracing::error!(error = %e, "Failed to take stdout from server");
538 // Mark this server as failed in our clients cache
539 self.clients.insert(id, None);
540 return Err(e);
541 }
542 };
543
544 tracing::debug!("Creating StdioTransport and McpClient");
545 let transport = StdioTransport::new(server_name.clone(), stdin, stdout);
546 let client = McpClient::new(server_name, transport);
547
548 // Store the client in our cache
549 self.clients.insert(id, Some(client.clone()));
550
551 tracing::info!("Client created successfully");
552 Ok(client)
553 }
554
555 /// Start the SSE proxy server if enabled in configuration
556 ///
557 /// This method is instrumented with `tracing`.
558 #[tracing::instrument(skip(self))]
559 pub async fn start_sse_proxy(&mut self) -> Result<()> {
560 if let Some(proxy_config) = &self.config.sse_proxy {
561 tracing::info!("Initializing SSE proxy server");
562
563 // Create the runner access functions
564 let runner_access = SSEProxyRunnerAccess {
565 get_server_id: Arc::new({
566 let self_clone = self.clone(); // Clone self to move into the closure
567 move |name: &str| self_clone.get_server_id(name)
568 }),
569 get_client: Arc::new({
570 let self_clone = self.clone(); // Clone self to move into the closure
571 move |id: ServerId| {
572 // We need a mutable reference to self, which we can't have in a closure
573 // Create a new client using the same logic as get_client, but without caching
574 let servers = &self_clone.servers;
575 if let Some(server) = servers.get(&id) {
576 // We can't actually take stdin/stdout from the server in this closure because we only
577 // have a shared reference. Instead, we'll create a new client to talk to the existing server.
578 // This is inefficient but necessary for the proxy's design.
579 let server_name = server.name().to_string();
580 match McpClient::connect(&server_name, &self_clone.config) {
581 Ok(client) => Ok(client),
582 Err(e) => {
583 tracing::error!(error = %e, server_id = ?id, "Failed to create client for SSE proxy");
584 Err(e)
585 }
586 }
587 } else {
588 Err(Error::ServerNotFound(format!("{:?}", id)))
589 }
590 }
591 }),
592 get_allowed_servers: Arc::new({
593 let config = self.config.clone(); // Clone config to move into the closure
594 move || {
595 // Extract the allowed_servers from the sse_proxy config if present
596 config
597 .sse_proxy
598 .as_ref()
599 .and_then(|proxy_config| proxy_config.allowed_servers.clone())
600 }
601 }),
602 get_server_config_keys: Arc::new({
603 let config = self.config.clone(); // Clone config to move into the closure
604 move || {
605 // Return all server names from the config
606 config.mcp_servers.keys().cloned().collect()
607 }
608 }),
609 };
610
611 // Convert the config reference to an owned value
612 let proxy_config_owned = proxy_config.clone();
613
614 // Start the proxy with the runner access functions
615 let proxy_handle = SSEProxy::start_proxy(runner_access, proxy_config_owned).await?;
616
617 // Store the proxy handle
618 self.sse_proxy_handle = Some(proxy_handle);
619
620 tracing::info!(
621 "SSE proxy server started on {}:{}",
622 proxy_config.address,
623 proxy_config.port
624 );
625 Ok(())
626 } else {
627 tracing::warn!("SSE proxy not configured, skipping start");
628 Err(Error::Other(
629 "SSE proxy not configured in config".to_string(),
630 ))
631 }
632 }
633
634 /// Check if the SSE proxy is enabled in configuration
635 ///
636 /// This method is instrumented with `tracing`.
637 #[tracing::instrument(skip(self))]
638 pub fn is_sse_proxy_configured(&self) -> bool {
639 self.config.sse_proxy.is_some()
640 }
641
642 /// Get the SSE proxy configuration if it exists
643 ///
644 /// Retrieves a reference to the SSE proxy configuration from the runner's config.
645 /// This is useful for accessing proxy settings like address and port.
646 ///
647 /// # Returns
648 ///
649 /// A `Result` containing a reference to the `SSEProxyConfig` if configured,
650 /// or an `Error::Other` if no SSE proxy is configured.
651 ///
652 /// # Examples
653 ///
654 /// ```no_run
655 /// use mcp_runner::McpRunner;
656 ///
657 /// #[tokio::main]
658 /// async fn main() -> mcp_runner::Result<()> {
659 /// let runner = McpRunner::from_config_file("config.json")?;
660 ///
661 /// if runner.is_sse_proxy_configured() {
662 /// let proxy_config = runner.get_sse_proxy_config()?;
663 /// println!("SSE proxy will listen on {}:{}", proxy_config.address, proxy_config.port);
664 /// }
665 ///
666 /// Ok(())
667 /// }
668 /// ```
669 ///
670 /// This method is instrumented with `tracing`.
671 #[tracing::instrument(skip(self))]
672 pub fn get_sse_proxy_config(&self) -> Result<&config::SSEProxyConfig> {
673 tracing::debug!("Getting SSE proxy configuration");
674 self.config.sse_proxy.as_ref().ok_or_else(|| {
675 tracing::warn!("SSE proxy configuration requested but not configured");
676 Error::Other("SSE proxy not configured".to_string())
677 })
678 }
679
680 /// Get the running SSE proxy handle if it exists
681 ///
682 /// This method provides access to the running SSE proxy handle, which can be used
683 /// to communicate with the proxy or control it.
684 /// Note that this will only return a value if the proxy was previously started
685 /// with `start_sse_proxy()` or `start_all_with_proxy()`.
686 ///
687 /// # Returns
688 ///
689 /// A `Result` containing a reference to the running `SSEProxyHandle` instance,
690 /// or an `Error::Other` if no SSE proxy is running.
691 ///
692 /// # Examples
693 ///
694 /// ```no_run
695 /// use mcp_runner::McpRunner;
696 ///
697 /// #[tokio::main]
698 /// async fn main() -> mcp_runner::Result<()> {
699 /// let mut runner = McpRunner::from_config_file("config.json")?;
700 ///
701 /// // Start servers and proxy
702 /// let (server_ids, proxy_started) = runner.start_all_with_proxy().await;
703 /// let _server_ids = server_ids?;
704 ///
705 /// if proxy_started {
706 /// // Access the running proxy handle
707 /// let proxy_handle = runner.get_sse_proxy_handle()?;
708 /// let config = proxy_handle.config();
709 /// println!("SSE proxy is running on {}:{}", config.address, config.port);
710 /// }
711 ///
712 /// Ok(())
713 /// }
714 /// ```
715 ///
716 /// This method is instrumented with `tracing`.
717 #[tracing::instrument(skip(self))]
718 pub fn get_sse_proxy_handle(&self) -> Result<&SSEProxyHandle> {
719 tracing::debug!("Getting SSE proxy handle");
720 self.sse_proxy_handle.as_ref().ok_or_else(|| {
721 tracing::warn!("SSE proxy handle requested but no proxy is running");
722 Error::Other("SSE proxy not running".to_string())
723 })
724 }
725
726 /// Get status for all running servers
727 ///
728 /// This method returns a HashMap of server names to their current status.
729 /// This is a convenience method that can be called at any time to check on all servers.
730 ///
731 /// # Returns
732 ///
733 /// A `HashMap<String, ServerStatus>` containing the status of all currently running servers.
734 ///
735 /// # Examples
736 ///
737 /// ```no_run
738 /// use mcp_runner::McpRunner;
739 ///
740 /// #[tokio::main]
741 /// async fn main() -> mcp_runner::Result<()> {
742 /// let mut runner = McpRunner::from_config_file("config.json")?;
743 /// runner.start_all_servers().await?;
744 ///
745 /// // Check status of all servers
746 /// let statuses = runner.get_all_server_statuses();
747 /// for (name, status) in statuses {
748 /// println!("Server '{}' status: {:?}", name, status);
749 /// }
750 ///
751 /// Ok(())
752 /// }
753 /// ```
754 ///
755 /// This method is instrumented with `tracing`.
756 #[tracing::instrument(skip(self))]
757 pub fn get_all_server_statuses(&self) -> HashMap<String, ServerStatus> {
758 tracing::debug!("Getting status for all running servers");
759 let mut statuses = HashMap::new();
760
761 for (server_name, server_id) in &self.server_names {
762 if let Some(server) = self.servers.get(server_id) {
763 let status = server.status();
764 statuses.insert(server_name.clone(), status);
765 tracing::trace!(server = %server_name, status = ?status);
766 }
767 }
768
769 tracing::debug!(num_servers = statuses.len(), "Collected server statuses");
770 statuses
771 }
772
773 /// Get a list of available tools for a specific server
774 ///
775 /// This is a convenience method that creates a temporary client to query tools from a server.
776 /// Unlike `get_client().list_tools()`, this method handles all the client creation and cleanup internally.
777 ///
778 /// # Returns
779 ///
780 /// A `Result` containing a vector of tools provided by the specified server.
781 ///
782 /// # Examples
783 ///
784 /// ```no_run
785 /// use mcp_runner::McpRunner;
786 ///
787 /// #[tokio::main]
788 /// async fn main() -> mcp_runner::Result<()> {
789 /// let mut runner = McpRunner::from_config_file("config.json")?;
790 /// runner.start_server("fetch").await?;
791 ///
792 /// // Get tools for a specific server
793 /// let tools = runner.get_server_tools("fetch").await?;
794 /// for tool in tools {
795 /// println!("Tool: {} - {}", tool.name, tool.description);
796 /// }
797 ///
798 /// Ok(())
799 /// }
800 /// ```
801 ///
802 /// This method is instrumented with `tracing`.
803 #[tracing::instrument(skip(self), fields(server_name = %name))]
804 pub async fn get_server_tools(&mut self, name: &str) -> Result<Vec<client::Tool>> {
805 tracing::info!("Getting tools for server '{}'", name);
806
807 // Get server ID
808 let server_id = self.get_server_id(name)?;
809
810 // Check if we already have a client for this server
811 let client_from_cache = if let Some(Some(_client)) = self.clients.get(&server_id) {
812 tracing::debug!("Using cached client");
813 // Return a specific error type for this case
814 true
815 } else {
816 false
817 };
818
819 // Get or create client
820 let result: Result<Vec<client::Tool>> = if client_from_cache {
821 // Use cached client
822 let client = self.clients.get(&server_id).unwrap().as_ref().unwrap();
823
824 // Initialize the client
825 client.initialize().await.map_err(|e| {
826 tracing::error!(error = %e, "Failed to initialize client");
827 e
828 })?;
829
830 // List tools
831 client.list_tools().await.map_err(|e| {
832 tracing::error!(error = %e, "Failed to list tools for server");
833 e
834 })
835 } else {
836 // Create a new client
837 match self.get_client(server_id) {
838 Ok(client) => {
839 // Initialize the client
840 client.initialize().await.map_err(|e| {
841 tracing::error!(error = %e, "Failed to initialize client");
842 e
843 })?;
844
845 // List tools
846 client.list_tools().await.map_err(|e| {
847 tracing::error!(error = %e, "Failed to list tools for server");
848 e
849 })
850 }
851 Err(e) => {
852 tracing::error!(error = %e, "Failed to get client");
853 Err(e)
854 }
855 }
856 };
857
858 match &result {
859 Ok(tools) => {
860 let tools_len = tools.len();
861 tracing::info!(server = %name, num_tools = tools_len, "Successfully retrieved tools");
862 }
863 Err(e) => {
864 tracing::error!(server = %name, error = %e, "Failed to get tools");
865 }
866 }
867
868 result
869 }
870
871 /// Get tools for all running servers
872 ///
873 /// This method returns a HashMap of server names to their available tools.
874 /// This is a convenience method that can be called at any time to check tools for all running servers.
875 ///
876 /// # Returns
877 ///
878 /// A `HashMap<String, Result<Vec<Tool>>>` containing the tools of all currently running servers.
879 /// The Result indicates whether listing tools was successful for each server.
880 ///
881 /// # Examples
882 ///
883 /// ```no_run
884 /// use mcp_runner::McpRunner;
885 ///
886 /// #[tokio::main]
887 /// async fn main() -> mcp_runner::Result<()> {
888 /// let mut runner = McpRunner::from_config_file("config.json")?;
889 /// runner.start_all_servers().await?;
890 ///
891 /// // Get tools for all servers
892 /// let all_tools = runner.get_all_server_tools().await;
893 /// for (server_name, tools_result) in all_tools {
894 /// match tools_result {
895 /// Ok(tools) => {
896 /// println!("Server '{}' tools:", server_name);
897 /// for tool in tools {
898 /// println!(" - {}: {}", tool.name, tool.description);
899 /// }
900 /// },
901 /// Err(e) => println!("Failed to get tools for '{}': {}", server_name, e),
902 /// }
903 /// }
904 ///
905 /// Ok(())
906 /// }
907 /// ```
908 ///
909 /// This method is instrumented with `tracing`.
910 #[tracing::instrument(skip(self))]
911 pub async fn get_all_server_tools(&mut self) -> HashMap<String, Result<Vec<client::Tool>>> {
912 tracing::debug!("Getting tools for all running servers");
913 let mut all_tools = HashMap::new();
914
915 // Need to collect keys to avoid borrowing issues
916 let server_names: Vec<String> = self.server_names.keys().cloned().collect();
917
918 // Process each server sequentially with timeout protection
919 for name in server_names {
920 tracing::debug!(server = %name, "Getting tools");
921 // For each server, get its tools with a timeout to prevent hanging
922 let result = tokio::time::timeout(
923 std::time::Duration::from_secs(15),
924 self.get_server_tools(&name),
925 )
926 .await;
927
928 // Process the result, handling timeout case separately
929 let final_result = match result {
930 Ok(inner_result) => inner_result,
931 Err(_) => {
932 tracing::warn!(server = %name, "Timed out getting tools");
933 Err(Error::Timeout(format!(
934 "Tool listing for server '{}' timed out",
935 name
936 )))
937 }
938 };
939
940 // Store the result (success or error) in the map
941 all_tools.insert(name, final_result);
942 }
943
944 tracing::debug!(
945 num_servers = all_tools.len(),
946 "Collected tools for all servers"
947 );
948 all_tools
949 }
950
951 /// Create a snapshot of current server information
952 ///
953 /// This creates a HashMap of server names to their ServerInfo which can be used
954 /// by the SSE proxy to report accurate server status information.
955 ///
956 /// This method is instrumented with `tracing`.
957 #[tracing::instrument(skip(self))]
958 fn get_server_info_snapshot(&self) -> HashMap<String, ServerInfo> {
959 tracing::debug!("Creating server information snapshot for SSE proxy");
960 let mut server_info = HashMap::new();
961
962 for (name, id) in &self.server_names {
963 if let Some(server) = self.servers.get(id) {
964 let status = server.status();
965 server_info.insert(
966 name.clone(),
967 ServerInfo {
968 name: name.clone(),
969 id: format!("{:?}", id),
970 status: format!("{:?}", status),
971 },
972 );
973 tracing::trace!(server = %name, status = ?status, "Added server to snapshot");
974 }
975 }
976
977 server_info
978 }
979}
980
981impl Clone for McpRunner {
982 fn clone(&self) -> Self {
983 Self {
984 config: self.config.clone(),
985 servers: self.servers.clone(),
986 server_names: self.server_names.clone(),
987 sse_proxy_handle: self.sse_proxy_handle.clone(),
988 clients: HashMap::new(), // We don't clone clients as they can't be cleanly cloned
989 }
990 }
991}