Skip to main content

adk_tool/mcp/manager/
manager.rs

1//! McpServerManager implementation.
2//!
3//! This module contains the main [`McpServerManager`] struct and its
4//! construction/builder methods, as well as lifecycle methods (start, stop,
5//! restart) for individual servers.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use adk_core::AdkError;
12use tokio::sync::RwLock;
13use tokio_util::sync::CancellationToken;
14
15use super::super::elicitation::{AutoDeclineElicitationHandler, ElicitationHandler};
16use super::super::toolset::McpToolset;
17use super::config::McpServerConfig;
18use super::entry::{BackoffState, McpServerEntry};
19use super::status::ServerStatus;
20
21/// Manages the full lifecycle of multiple local MCP server child processes.
22///
23/// `McpServerManager` spawns processes, connects them via `TokioChildProcess`
24/// transport into [`McpToolset`](super::super::McpToolset) instances, monitors
25/// health, auto-restarts on crash with exponential backoff, and aggregates tools
26/// from all managed servers behind the [`Toolset`](adk_core::Toolset) trait.
27///
28/// # Construction
29///
30/// Use [`McpServerManager::new`] with a map of server configurations, then chain
31/// builder methods to configure handlers and intervals:
32///
33/// ```rust,ignore
34/// use adk_tool::mcp::manager::{McpServerConfig, McpServerManager};
35/// use std::collections::HashMap;
36/// use std::time::Duration;
37///
38/// let configs = HashMap::from([
39///     ("my-server".to_string(), McpServerConfig {
40///         command: "npx".to_string(),
41///         args: vec!["-y".to_string(), "@modelcontextprotocol/server-filesystem".to_string()],
42///         ..Default::default()
43///     }),
44/// ]);
45///
46/// let manager = McpServerManager::new(configs)
47///     .with_health_check_interval(Duration::from_secs(15))
48///     .with_grace_period(Duration::from_secs(3))
49///     .with_name("my_manager");
50/// ```
51#[allow(dead_code)] // Fields used by lifecycle methods in later tasks
52pub struct McpServerManager {
53    /// Thread-safe map of server ID to per-server state.
54    pub(crate) servers: Arc<RwLock<HashMap<String, McpServerEntry>>>,
55
56    /// Optional elicitation handler shared across all managed server connections.
57    pub(crate) elicitation_handler: Option<Arc<dyn ElicitationHandler>>,
58
59    /// Optional sampling handler shared across all managed server connections.
60    /// Only available when the `mcp-sampling` feature is enabled.
61    #[cfg(feature = "mcp-sampling")]
62    pub(crate) sampling_handler: Option<Arc<dyn crate::sampling::SamplingHandler>>,
63
64    /// Interval between health check cycles. Default: 30 seconds.
65    pub(crate) health_check_interval: Duration,
66
67    /// Grace period to wait for a child process to exit before force-killing. Default: 5 seconds.
68    pub(crate) grace_period: Duration,
69
70    /// Cancellation token used to stop the health monitoring background task.
71    pub(crate) monitor_cancel: CancellationToken,
72
73    /// Name returned by the `Toolset::name()` implementation. Default: `"mcp_server_manager"`.
74    pub(crate) name: String,
75}
76
77impl McpServerManager {
78    /// Create a new `McpServerManager` from a map of server configurations.
79    ///
80    /// Each entry is keyed by a unique server ID. Servers with `disabled: true`
81    /// are initialized with [`ServerStatus::Disabled`]; all others start as
82    /// [`ServerStatus::Stopped`].
83    ///
84    /// No servers are started automatically — call [`start_server`](Self::start_server)
85    /// or [`start_all`](Self::start_all) to begin spawning processes.
86    pub fn new(configs: HashMap<String, McpServerConfig>) -> Self {
87        let servers: HashMap<String, McpServerEntry> = configs
88            .into_iter()
89            .map(|(id, config)| {
90                let status =
91                    if config.disabled { ServerStatus::Disabled } else { ServerStatus::Stopped };
92                let backoff = BackoffState::new(&config.restart_policy);
93                let entry = McpServerEntry { config, status, toolset: None, child: None, backoff };
94                (id, entry)
95            })
96            .collect();
97
98        Self {
99            servers: Arc::new(RwLock::new(servers)),
100            elicitation_handler: None,
101            #[cfg(feature = "mcp-sampling")]
102            sampling_handler: None,
103            health_check_interval: Duration::from_secs(30),
104            grace_period: Duration::from_secs(5),
105            monitor_cancel: CancellationToken::new(),
106            name: "mcp_server_manager".to_string(),
107        }
108    }
109
110    /// Create a new `McpServerManager` by parsing a JSON string in Kiro `mcp.json` format.
111    ///
112    /// The JSON must contain a top-level `mcpServers` object mapping server IDs
113    /// to their configurations. CamelCase JSON field names are automatically
114    /// mapped to snake_case Rust fields.
115    ///
116    /// # Errors
117    ///
118    /// Returns `AdkError::Tool` if the JSON is malformed or missing required fields.
119    ///
120    /// # Example
121    ///
122    /// ```rust,ignore
123    /// let json = r#"{
124    ///     "mcpServers": {
125    ///         "filesystem": {
126    ///             "command": "npx",
127    ///             "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
128    ///         }
129    ///     }
130    /// }"#;
131    /// let manager = McpServerManager::from_json(json)?;
132    /// ```
133    pub fn from_json(json: &str) -> adk_core::Result<Self> {
134        let file: super::config::McpJsonFile = serde_json::from_str(json)
135            .map_err(|e| AdkError::tool(format!("failed to parse MCP server config: {e}")))?;
136        Ok(Self::new(file.mcp_servers))
137    }
138
139    /// Create a new `McpServerManager` by reading and parsing a JSON file from disk.
140    ///
141    /// The file must contain JSON in Kiro `mcp.json` format (see [`from_json`](Self::from_json)).
142    /// File reading is synchronous, which is acceptable for config loading at startup.
143    ///
144    /// # Errors
145    ///
146    /// Returns `AdkError::Tool` if the file cannot be read or the JSON is malformed.
147    ///
148    /// # Example
149    ///
150    /// ```rust,ignore
151    /// let manager = McpServerManager::from_json_file("mcp.json")?;
152    /// ```
153    pub fn from_json_file(path: impl AsRef<std::path::Path>) -> adk_core::Result<Self> {
154        let path = path.as_ref();
155        let content = std::fs::read_to_string(path).map_err(|e| {
156            AdkError::tool(format!("failed to read config file '{}': {e}", path.display()))
157        })?;
158        Self::from_json(&content)
159    }
160
161    /// Set the elicitation handler used for all managed server connections.
162    ///
163    /// The handler is preserved across server restarts via `Arc` sharing.
164    pub fn with_elicitation_handler(mut self, handler: Arc<dyn ElicitationHandler>) -> Self {
165        self.elicitation_handler = Some(handler);
166        self
167    }
168
169    /// Set the sampling handler used for all managed server connections.
170    ///
171    /// The handler is preserved across server restarts via `Arc` sharing.
172    /// Only available when the `mcp-sampling` feature is enabled.
173    #[cfg(feature = "mcp-sampling")]
174    pub fn with_sampling_handler(
175        mut self,
176        handler: Arc<dyn crate::sampling::SamplingHandler>,
177    ) -> Self {
178        self.sampling_handler = Some(handler);
179        self
180    }
181
182    /// Set the interval between health check cycles.
183    ///
184    /// Default: 30 seconds.
185    pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
186        self.health_check_interval = interval;
187        self
188    }
189
190    /// Set the grace period to wait for a child process to exit before force-killing.
191    ///
192    /// Default: 5 seconds.
193    pub fn with_grace_period(mut self, period: Duration) -> Self {
194        self.grace_period = period;
195        self
196    }
197
198    /// Set the name returned by the `Toolset::name()` implementation.
199    ///
200    /// Default: `"mcp_server_manager"`.
201    pub fn with_name(mut self, name: impl Into<String>) -> Self {
202        self.name = name.into();
203        self
204    }
205
206    /// Start a managed MCP server by ID.
207    ///
208    /// Spawns the configured command as a child process, creates a
209    /// `TokioChildProcess` transport, and connects via `McpToolset` with the
210    /// configured elicitation (and optionally sampling) handler.
211    ///
212    /// If the server is already `Running`, this is a no-op and returns `Ok(())`.
213    ///
214    /// # Errors
215    ///
216    /// Returns `AdkError::Tool` if:
217    /// - The server ID does not exist
218    /// - The child process fails to spawn
219    /// - The MCP handshake fails
220    ///
221    /// # Example
222    ///
223    /// ```rust,ignore
224    /// manager.start_server("my-server").await?;
225    /// ```
226    pub async fn start_server(&self, id: &str) -> adk_core::Result<()> {
227        let mut servers = self.servers.write().await;
228        let entry = servers
229            .get_mut(id)
230            .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
231
232        Self::start_server_inner(
233            id,
234            entry,
235            &self.elicitation_handler,
236            #[cfg(feature = "mcp-sampling")]
237            &self.sampling_handler,
238        )
239        .await
240    }
241
242    /// Internal start logic operating on a mutable entry reference.
243    ///
244    /// This avoids double-locking when called from `restart_server`.
245    async fn start_server_inner(
246        id: &str,
247        entry: &mut McpServerEntry,
248        elicitation_handler: &Option<Arc<dyn ElicitationHandler>>,
249        #[cfg(feature = "mcp-sampling")] sampling_handler: &Option<
250            Arc<dyn crate::sampling::SamplingHandler>,
251        >,
252    ) -> adk_core::Result<()> {
253        // If already running, nothing to do
254        if entry.status == ServerStatus::Running {
255            return Ok(());
256        }
257
258        let config = &entry.config;
259
260        // Build the command
261        let mut cmd = tokio::process::Command::new(&config.command);
262        cmd.args(&config.args);
263        cmd.envs(&config.env);
264
265        // Create transport — TokioChildProcess::new spawns the child internally
266        let transport = rmcp::transport::TokioChildProcess::new(cmd).map_err(|e| {
267            entry.status = ServerStatus::FailedToStart;
268            AdkError::tool(format!(
269                "failed to spawn server '{id}': command '{}' not found. Verify it is installed and on PATH: {e}",
270                config.command
271            ))
272        })?;
273
274        // Connect via McpToolset with the appropriate handler
275        let handler: Arc<dyn ElicitationHandler> =
276            elicitation_handler.clone().unwrap_or_else(|| Arc::new(AutoDeclineElicitationHandler));
277
278        #[cfg(feature = "mcp-sampling")]
279        let toolset_result = if let Some(sampling) = sampling_handler {
280            McpToolset::with_sampling_handler(transport, handler, Arc::clone(sampling)).await
281        } else {
282            McpToolset::with_elicitation_handler(transport, handler).await
283        };
284
285        #[cfg(not(feature = "mcp-sampling"))]
286        let toolset_result = McpToolset::with_elicitation_handler(transport, handler).await;
287
288        let toolset = toolset_result.map_err(|e| {
289            entry.status = ServerStatus::FailedToStart;
290            AdkError::tool(format!("MCP handshake failed for server '{id}': {e}"))
291        })?;
292
293        // Success — update entry
294        entry.status = ServerStatus::Running;
295        entry.toolset = Some(toolset);
296        entry.child = None; // Child is owned by the transport/toolset
297
298        tracing::info!(
299            server.id = id,
300            server.command = config.command,
301            server.args = ?config.args,
302            "started MCP server"
303        );
304
305        Ok(())
306    }
307
308    /// Stop a managed MCP server by ID.
309    ///
310    /// Cancels the MCP session via the toolset's cancellation token, drops the
311    /// `McpToolset` connection, and sets the status to `Stopped`.
312    ///
313    /// If the server is not running, this is a no-op and returns `Ok(())`.
314    ///
315    /// # Errors
316    ///
317    /// Returns `AdkError::Tool` if the server ID does not exist.
318    ///
319    /// # Example
320    ///
321    /// ```rust,ignore
322    /// manager.stop_server("my-server").await?;
323    /// ```
324    pub async fn stop_server(&self, id: &str) -> adk_core::Result<()> {
325        let mut servers = self.servers.write().await;
326        let entry = servers
327            .get_mut(id)
328            .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
329
330        Self::stop_server_inner(id, entry, "manual").await;
331        Ok(())
332    }
333
334    /// Internal stop logic operating on a mutable entry reference.
335    ///
336    /// This avoids double-locking when called from `restart_server`.
337    async fn stop_server_inner(id: &str, entry: &mut McpServerEntry, reason: &str) {
338        // If not running, nothing to do
339        if entry.status != ServerStatus::Running && entry.status != ServerStatus::Restarting {
340            return;
341        }
342
343        // Cancel the MCP session and drop the toolset
344        if let Some(ref toolset) = entry.toolset {
345            let cancel_token = toolset.cancellation_token().await;
346            cancel_token.cancel();
347        }
348
349        // Drop the toolset — this cleans up the transport and child process
350        entry.toolset = None;
351        entry.child = None;
352
353        // Only set to Stopped if we're not in a Restarting transition
354        if entry.status != ServerStatus::Restarting {
355            entry.status = ServerStatus::Stopped;
356        }
357
358        tracing::info!(server.id = id, stop.reason = reason, "stopped MCP server");
359    }
360
361    /// Restart a managed MCP server by ID.
362    ///
363    /// Sets the status to `Restarting`, stops the server, then starts it again.
364    /// The same `ElicitationHandler` and `SamplingHandler` `Arc`s are preserved
365    /// across the restart.
366    ///
367    /// # Errors
368    ///
369    /// Returns `AdkError::Tool` if:
370    /// - The server ID does not exist
371    /// - The start phase fails (status set to `FailedToStart`)
372    ///
373    /// # Example
374    ///
375    /// ```rust,ignore
376    /// manager.restart_server("my-server").await?;
377    /// ```
378    pub async fn restart_server(&self, id: &str) -> adk_core::Result<()> {
379        let mut servers = self.servers.write().await;
380        let entry = servers
381            .get_mut(id)
382            .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
383
384        // Set status to Restarting
385        entry.status = ServerStatus::Restarting;
386
387        // Stop the server (inline to avoid double-locking)
388        Self::stop_server_inner(id, entry, "restart").await;
389
390        // Start the server again
391        Self::start_server_inner(
392            id,
393            entry,
394            &self.elicitation_handler,
395            #[cfg(feature = "mcp-sampling")]
396            &self.sampling_handler,
397        )
398        .await
399    }
400
401    /// Return the current [`ServerStatus`] for a given server ID.
402    ///
403    /// # Errors
404    ///
405    /// Returns `AdkError::Tool` if the server ID does not exist.
406    ///
407    /// # Example
408    ///
409    /// ```rust,ignore
410    /// let status = manager.server_status("my-server").await?;
411    /// assert_eq!(status, ServerStatus::Running);
412    /// ```
413    pub async fn server_status(&self, id: &str) -> adk_core::Result<ServerStatus> {
414        let servers = self.servers.read().await;
415        servers
416            .get(id)
417            .map(|entry| entry.status)
418            .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))
419    }
420
421    /// Return a map of all server IDs to their current [`ServerStatus`].
422    ///
423    /// # Example
424    ///
425    /// ```rust,ignore
426    /// let statuses = manager.all_statuses().await;
427    /// for (id, status) in &statuses {
428    ///     println!("{id}: {status:?}");
429    /// }
430    /// ```
431    pub async fn all_statuses(&self) -> HashMap<String, ServerStatus> {
432        let servers = self.servers.read().await;
433        servers.iter().map(|(id, entry)| (id.clone(), entry.status)).collect()
434    }
435
436    /// Return the number of servers currently in [`ServerStatus::Running`] status.
437    ///
438    /// # Example
439    ///
440    /// ```rust,ignore
441    /// let count = manager.running_server_count().await;
442    /// println!("{count} servers running");
443    /// ```
444    pub async fn running_server_count(&self) -> usize {
445        let servers = self.servers.read().await;
446        servers.values().filter(|entry| entry.status == ServerStatus::Running).count()
447    }
448
449    /// Start the background health monitoring task.
450    ///
451    /// Spawns a `tokio::spawn` task that periodically checks each `Running`
452    /// server by calling [`McpToolset::is_closed()`](super::super::McpToolset::is_closed).
453    /// If a server's connection is closed, the monitor sets its status to
454    /// `Crashed` and, if a [`RestartPolicy`] is configured, attempts auto-restart
455    /// with exponential backoff.
456    ///
457    /// The monitoring loop runs until [`stop_monitoring`](Self::stop_monitoring)
458    /// is called, which cancels the background task via the internal
459    /// `CancellationToken`.
460    ///
461    /// # Example
462    ///
463    /// ```rust,ignore
464    /// manager.start_monitoring();
465    /// // ... later ...
466    /// manager.stop_monitoring();
467    /// ```
468    pub fn start_monitoring(&self) {
469        let servers = Arc::clone(&self.servers);
470        let cancel = self.monitor_cancel.clone();
471        let interval = self.health_check_interval;
472        let elicitation_handler = self.elicitation_handler.clone();
473        #[cfg(feature = "mcp-sampling")]
474        let sampling_handler = self.sampling_handler.clone();
475
476        tokio::spawn(async move {
477            loop {
478                tokio::select! {
479                    _ = cancel.cancelled() => {
480                        tracing::info!("health monitor stopped");
481                        break;
482                    }
483                    _ = tokio::time::sleep(interval) => {
484                        // Phase 1: Detect crashed servers under a read lock
485                        let crashed_ids: Vec<String> = {
486                            let servers = servers.read().await;
487                            let mut crashed = Vec::new();
488                            for (id, entry) in servers.iter() {
489                                if entry.status != ServerStatus::Running {
490                                    continue;
491                                }
492                                if let Some(ref toolset) = entry.toolset {
493                                    if toolset.is_closed().await {
494                                        crashed.push(id.clone());
495                                    }
496                                } else {
497                                    // No toolset but status is Running — treat as crashed
498                                    crashed.push(id.clone());
499                                }
500                            }
501                            crashed
502                        };
503
504                        if crashed_ids.is_empty() {
505                            continue;
506                        }
507
508                        // Phase 2: Mark crashed servers and attempt auto-restart
509                        for id in crashed_ids {
510                            // Mark as Crashed under write lock
511                            let restart_info = {
512                                let mut servers = servers.write().await;
513                                if let Some(entry) = servers.get_mut(&id) {
514                                    // Only process if still Running (could have been
515                                    // stopped between read and write lock)
516                                    if entry.status != ServerStatus::Running {
517                                        continue;
518                                    }
519
520                                    tracing::warn!(
521                                        server.id = id,
522                                        failure.reason = "connection closed",
523                                        "health check failed"
524                                    );
525
526                                    entry.status = ServerStatus::Crashed;
527                                    entry.toolset = None;
528                                    entry.child = None;
529
530                                    // Check if auto-restart is configured
531                                    entry.config.restart_policy.clone()
532                                } else {
533                                    continue;
534                                }
535                            };
536
537                            // Attempt auto-restart if policy allows
538                            if let Some(ref policy) = restart_info {
539                                // Check if max attempts exceeded
540                                let exceeded = {
541                                    let servers = servers.read().await;
542                                    servers.get(&id)
543                                        .map(|e| e.backoff.exceeded_max_attempts(policy))
544                                        .unwrap_or(true)
545                                };
546
547                                if exceeded {
548                                    let mut servers = servers.write().await;
549                                    if let Some(entry) = servers.get_mut(&id) {
550                                        tracing::error!(
551                                            server.id = id,
552                                            restart.total_attempts = entry.backoff.consecutive_failures,
553                                            "max restart attempts exceeded, giving up"
554                                        );
555                                        entry.status = ServerStatus::FailedToStart;
556                                    }
557                                    continue;
558                                }
559
560                                // Compute backoff delay and increment failure counter
561                                let (delay_ms, attempt) = {
562                                    let mut servers = servers.write().await;
563                                    if let Some(entry) = servers.get_mut(&id) {
564                                        let attempt = entry.backoff.consecutive_failures + 1;
565                                        let delay = entry.backoff.next_delay(policy);
566                                        (delay, attempt)
567                                    } else {
568                                        continue;
569                                    }
570                                };
571
572                                tracing::info!(
573                                    server.id = id,
574                                    restart.attempt = attempt,
575                                    restart.delay_ms = delay_ms,
576                                    "auto-restarting crashed server after backoff"
577                                );
578
579                                // Wait for backoff delay (without holding any lock)
580                                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
581
582                                // Check if monitoring was cancelled during the sleep
583                                if cancel.is_cancelled() {
584                                    break;
585                                }
586
587                                // Attempt restart under write lock
588                                let restart_result = {
589                                    let mut servers = servers.write().await;
590                                    if let Some(entry) = servers.get_mut(&id) {
591                                        entry.status = ServerStatus::Restarting;
592                                        Self::start_server_inner(
593                                            &id,
594                                            entry,
595                                            &elicitation_handler,
596                                            #[cfg(feature = "mcp-sampling")]
597                                            &sampling_handler,
598                                        )
599                                        .await
600                                    } else {
601                                        continue;
602                                    }
603                                };
604
605                                match restart_result {
606                                    Ok(()) => {
607                                        // Reset backoff on success
608                                        let mut servers = servers.write().await;
609                                        if let Some(entry) = servers.get_mut(&id) {
610                                            entry.backoff.reset(policy);
611                                            tracing::info!(
612                                                server.id = id,
613                                                "auto-restart succeeded"
614                                            );
615                                        }
616                                    }
617                                    Err(e) => {
618                                        tracing::warn!(
619                                            server.id = id,
620                                            error = %e,
621                                            "auto-restart failed"
622                                        );
623                                        // Status already set to FailedToStart by start_server_inner
624                                    }
625                                }
626                            }
627                        }
628                    }
629                }
630            }
631        });
632    }
633
634    /// Stop the background health monitoring task.
635    ///
636    /// Cancels the monitoring loop spawned by [`start_monitoring`](Self::start_monitoring).
637    /// This is a no-op if monitoring was never started or has already been stopped.
638    ///
639    /// # Example
640    ///
641    /// ```rust,ignore
642    /// manager.stop_monitoring();
643    /// ```
644    pub fn stop_monitoring(&self) {
645        self.monitor_cancel.cancel();
646    }
647
648    /// Register a new server configuration at runtime.
649    ///
650    /// The new server is initialized with [`ServerStatus::Disabled`] if
651    /// `config.disabled` is `true`, or [`ServerStatus::Stopped`] otherwise.
652    /// It will not be started automatically — call
653    /// [`start_server`](Self::start_server) to begin spawning the process.
654    ///
655    /// # Errors
656    ///
657    /// Returns `AdkError::Tool` if a server with the given ID already exists.
658    ///
659    /// # Example
660    ///
661    /// ```rust,ignore
662    /// let config = McpServerConfig {
663    ///     command: "npx".to_string(),
664    ///     args: vec!["-y".to_string(), "server".to_string()],
665    ///     ..Default::default()
666    /// };
667    /// manager.add_server("new-server".to_string(), config).await?;
668    /// ```
669    pub async fn add_server(&self, id: String, config: McpServerConfig) -> adk_core::Result<()> {
670        let mut servers = self.servers.write().await;
671        if servers.contains_key(&id) {
672            return Err(AdkError::tool(format!("server ID '{id}' already exists")));
673        }
674        let status = if config.disabled { ServerStatus::Disabled } else { ServerStatus::Stopped };
675        let backoff = BackoffState::new(&config.restart_policy);
676        let entry = McpServerEntry { config, status, toolset: None, child: None, backoff };
677        servers.insert(id, entry);
678        Ok(())
679    }
680
681    /// Remove a server configuration at runtime.
682    ///
683    /// If the server is currently running, it is stopped first using the
684    /// graceful stop sequence before being removed from the manager.
685    ///
686    /// # Errors
687    ///
688    /// Returns `AdkError::Tool` if the server ID does not exist.
689    ///
690    /// # Example
691    ///
692    /// ```rust,ignore
693    /// manager.remove_server("old-server").await?;
694    /// ```
695    pub async fn remove_server(&self, id: &str) -> adk_core::Result<()> {
696        let mut servers = self.servers.write().await;
697        let entry = servers
698            .get_mut(id)
699            .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
700
701        // If the server is running, stop it first
702        Self::stop_server_inner(id, entry, "removal").await;
703
704        servers.remove(id);
705        Ok(())
706    }
707
708    /// Start all non-disabled servers concurrently.
709    ///
710    /// Collects all server IDs where `disabled == false`, then starts each one
711    /// via [`start_server`](Self::start_server). Failures are logged but do not
712    /// prevent other servers from starting.
713    ///
714    /// # Returns
715    ///
716    /// A `HashMap<String, Result<()>>` with per-server outcomes. Disabled servers
717    /// are not included in the result.
718    ///
719    /// # Example
720    ///
721    /// ```rust,ignore
722    /// let results = manager.start_all().await;
723    /// for (id, result) in &results {
724    ///     match result {
725    ///         Ok(()) => println!("{id}: started"),
726    ///         Err(e) => eprintln!("{id}: failed to start: {e}"),
727    ///     }
728    /// }
729    /// ```
730    pub async fn start_all(&self) -> HashMap<String, adk_core::Result<()>> {
731        // Collect IDs of non-disabled servers under a read lock
732        let ids_to_start: Vec<String> = {
733            let servers = self.servers.read().await;
734            servers
735                .iter()
736                .filter(|(_, entry)| !entry.config.disabled)
737                .map(|(id, _)| id.clone())
738                .collect()
739        };
740
741        // Start each server concurrently — each start_server call acquires
742        // its own write lock internally
743        let futures: Vec<_> = ids_to_start
744            .iter()
745            .map(|id| {
746                let id = id.clone();
747                async move {
748                    let result = self.start_server(&id).await;
749                    if let Err(ref e) = result {
750                        tracing::error!(
751                            server.id = id,
752                            error = %e,
753                            "failed to start server during start_all"
754                        );
755                    }
756                    (id, result)
757                }
758            })
759            .collect();
760
761        futures::future::join_all(futures).await.into_iter().collect()
762    }
763
764    /// Shut down all managed servers and stop health monitoring.
765    ///
766    /// This method first stops the health monitoring task, then stops all
767    /// running servers using the graceful stop sequence (cancel token → grace
768    /// period → force-kill). After shutdown, all server statuses are set to
769    /// `Stopped`.
770    ///
771    /// # Example
772    ///
773    /// ```rust,ignore
774    /// manager.shutdown().await?;
775    /// // All servers are now stopped, safe to drop the manager
776    /// ```
777    pub async fn shutdown(&self) -> adk_core::Result<()> {
778        // Step 1: Stop health monitoring first
779        self.stop_monitoring();
780
781        // Step 2: Acquire write lock and stop all running servers
782        let mut servers = self.servers.write().await;
783        let ids: Vec<String> = servers
784            .iter()
785            .filter(|(_, entry)| entry.status == ServerStatus::Running)
786            .map(|(id, _)| id.clone())
787            .collect();
788
789        for id in &ids {
790            if let Some(entry) = servers.get_mut(id) {
791                Self::stop_server_inner(id, entry, "shutdown").await;
792            }
793        }
794
795        // Step 3: Set all server statuses to Stopped (except Disabled)
796        for entry in servers.values_mut() {
797            if entry.status != ServerStatus::Disabled {
798                entry.status = ServerStatus::Stopped;
799            }
800        }
801
802        Ok(())
803    }
804}
805
806impl Drop for McpServerManager {
807    fn drop(&mut self) {
808        // Use try_read() to avoid blocking in Drop
809        if let Ok(servers) = self.servers.try_read() {
810            let running = servers.values().filter(|e| e.status == ServerStatus::Running).count();
811            if running > 0 {
812                tracing::warn!(
813                    running_count = running,
814                    "McpServerManager dropped with {running} servers still running. \
815                     Call shutdown() before dropping to ensure clean process cleanup."
816                );
817            }
818        }
819    }
820}
821
822// Static assertions: McpServerManager must be Send + Sync so it can be
823// shared across async tasks via Arc.
824const _: () = {
825    fn _assert_send<T: Send>() {}
826    fn _assert_sync<T: Sync>() {}
827    fn _assert_send_sync() {
828        _assert_send::<McpServerManager>();
829        _assert_sync::<McpServerManager>();
830    }
831};
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836
837    #[test]
838    fn test_new_empty_configs() {
839        let manager = McpServerManager::new(HashMap::new());
840        assert_eq!(manager.name, "mcp_server_manager");
841        assert_eq!(manager.health_check_interval, Duration::from_secs(30));
842        assert_eq!(manager.grace_period, Duration::from_secs(5));
843        assert!(manager.elicitation_handler.is_none());
844    }
845
846    #[test]
847    fn test_new_disabled_server_gets_disabled_status() {
848        let configs = HashMap::from([(
849            "disabled-server".to_string(),
850            McpServerConfig {
851                command: "echo".to_string(),
852                args: vec![],
853                env: HashMap::new(),
854                disabled: true,
855                auto_approve: vec![],
856                restart_policy: None,
857            },
858        )]);
859        let manager = McpServerManager::new(configs);
860        let servers = manager.servers.try_read().unwrap();
861        assert_eq!(servers["disabled-server"].status, ServerStatus::Disabled);
862    }
863
864    #[test]
865    fn test_new_enabled_server_gets_stopped_status() {
866        let configs = HashMap::from([(
867            "enabled-server".to_string(),
868            McpServerConfig {
869                command: "echo".to_string(),
870                args: vec![],
871                env: HashMap::new(),
872                disabled: false,
873                auto_approve: vec![],
874                restart_policy: None,
875            },
876        )]);
877        let manager = McpServerManager::new(configs);
878        let servers = manager.servers.try_read().unwrap();
879        assert_eq!(servers["enabled-server"].status, ServerStatus::Stopped);
880    }
881
882    #[test]
883    fn test_builder_with_name() {
884        let manager = McpServerManager::new(HashMap::new()).with_name("custom_name");
885        assert_eq!(manager.name, "custom_name");
886    }
887
888    #[test]
889    fn test_builder_with_health_check_interval() {
890        let manager = McpServerManager::new(HashMap::new())
891            .with_health_check_interval(Duration::from_secs(10));
892        assert_eq!(manager.health_check_interval, Duration::from_secs(10));
893    }
894
895    #[test]
896    fn test_builder_with_grace_period() {
897        let manager =
898            McpServerManager::new(HashMap::new()).with_grace_period(Duration::from_secs(2));
899        assert_eq!(manager.grace_period, Duration::from_secs(2));
900    }
901
902    #[test]
903    fn test_builder_with_elicitation_handler() {
904        use super::super::super::elicitation::AutoDeclineElicitationHandler;
905        let handler: Arc<dyn ElicitationHandler> = Arc::new(AutoDeclineElicitationHandler);
906        let manager = McpServerManager::new(HashMap::new()).with_elicitation_handler(handler);
907        assert!(manager.elicitation_handler.is_some());
908    }
909
910    #[tokio::test]
911    async fn test_server_status_returns_correct_status() {
912        let configs = HashMap::from([(
913            "server-a".to_string(),
914            McpServerConfig {
915                command: "echo".to_string(),
916                args: vec![],
917                env: HashMap::new(),
918                disabled: false,
919                auto_approve: vec![],
920                restart_policy: None,
921            },
922        )]);
923        let manager = McpServerManager::new(configs);
924        let status = manager.server_status("server-a").await.unwrap();
925        assert_eq!(status, ServerStatus::Stopped);
926    }
927
928    #[tokio::test]
929    async fn test_server_status_unknown_id_returns_error() {
930        let manager = McpServerManager::new(HashMap::new());
931        let result = manager.server_status("nonexistent").await;
932        assert!(result.is_err());
933        let err_msg = format!("{}", result.unwrap_err());
934        assert!(err_msg.contains("unknown server ID: 'nonexistent'"));
935    }
936
937    #[tokio::test]
938    async fn test_all_statuses_returns_all_servers() {
939        let configs = HashMap::from([
940            (
941                "server-a".to_string(),
942                McpServerConfig {
943                    command: "echo".to_string(),
944                    args: vec![],
945                    env: HashMap::new(),
946                    disabled: false,
947                    auto_approve: vec![],
948                    restart_policy: None,
949                },
950            ),
951            (
952                "server-b".to_string(),
953                McpServerConfig {
954                    command: "echo".to_string(),
955                    args: vec![],
956                    env: HashMap::new(),
957                    disabled: true,
958                    auto_approve: vec![],
959                    restart_policy: None,
960                },
961            ),
962        ]);
963        let manager = McpServerManager::new(configs);
964        let statuses = manager.all_statuses().await;
965        assert_eq!(statuses.len(), 2);
966        assert_eq!(statuses["server-a"], ServerStatus::Stopped);
967        assert_eq!(statuses["server-b"], ServerStatus::Disabled);
968    }
969
970    #[tokio::test]
971    async fn test_all_statuses_empty_manager() {
972        let manager = McpServerManager::new(HashMap::new());
973        let statuses = manager.all_statuses().await;
974        assert!(statuses.is_empty());
975    }
976
977    #[tokio::test]
978    async fn test_running_server_count_no_running() {
979        let configs = HashMap::from([(
980            "server-a".to_string(),
981            McpServerConfig {
982                command: "echo".to_string(),
983                args: vec![],
984                env: HashMap::new(),
985                disabled: false,
986                auto_approve: vec![],
987                restart_policy: None,
988            },
989        )]);
990        let manager = McpServerManager::new(configs);
991        assert_eq!(manager.running_server_count().await, 0);
992    }
993
994    #[tokio::test]
995    async fn test_running_server_count_empty_manager() {
996        let manager = McpServerManager::new(HashMap::new());
997        assert_eq!(manager.running_server_count().await, 0);
998    }
999
1000    #[tokio::test]
1001    async fn test_start_all_skips_disabled_servers() {
1002        let configs = HashMap::from([
1003            (
1004                "enabled".to_string(),
1005                McpServerConfig {
1006                    command: "nonexistent-command-xyz".to_string(),
1007                    args: vec![],
1008                    env: HashMap::new(),
1009                    disabled: false,
1010                    auto_approve: vec![],
1011                    restart_policy: None,
1012                },
1013            ),
1014            (
1015                "disabled".to_string(),
1016                McpServerConfig {
1017                    command: "echo".to_string(),
1018                    args: vec![],
1019                    env: HashMap::new(),
1020                    disabled: true,
1021                    auto_approve: vec![],
1022                    restart_policy: None,
1023                },
1024            ),
1025        ]);
1026        let manager = McpServerManager::new(configs);
1027        let results = manager.start_all().await;
1028
1029        // Only the enabled server should be in the results
1030        assert!(results.contains_key("enabled"));
1031        assert!(!results.contains_key("disabled"));
1032
1033        // The enabled server should fail (nonexistent command)
1034        assert!(results["enabled"].is_err());
1035
1036        // The disabled server should still be Disabled
1037        let status = manager.server_status("disabled").await.unwrap();
1038        assert_eq!(status, ServerStatus::Disabled);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_start_all_empty_manager() {
1043        let manager = McpServerManager::new(HashMap::new());
1044        let results = manager.start_all().await;
1045        assert!(results.is_empty());
1046    }
1047
1048    #[test]
1049    fn test_from_json_valid() {
1050        let json = r#"{
1051            "mcpServers": {
1052                "filesystem": {
1053                    "command": "npx",
1054                    "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
1055                    "env": { "NODE_ENV": "production" },
1056                    "disabled": false,
1057                    "autoApprove": ["read_file", "list_directory"]
1058                },
1059                "github": {
1060                    "command": "npx",
1061                    "args": ["-y", "@modelcontextprotocol/server-github"],
1062                    "env": { "GITHUB_TOKEN": "ghp_xxx" },
1063                    "disabled": true,
1064                    "autoApprove": []
1065                }
1066            }
1067        }"#;
1068        let manager = McpServerManager::from_json(json).unwrap();
1069        let servers = manager.servers.try_read().unwrap();
1070        assert_eq!(servers.len(), 2);
1071
1072        let fs_entry = &servers["filesystem"];
1073        assert_eq!(fs_entry.config.command, "npx");
1074        assert_eq!(
1075            fs_entry.config.args,
1076            vec!["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
1077        );
1078        assert_eq!(fs_entry.config.env["NODE_ENV"], "production");
1079        assert!(!fs_entry.config.disabled);
1080        assert_eq!(fs_entry.config.auto_approve, vec!["read_file", "list_directory"]);
1081        assert_eq!(fs_entry.status, ServerStatus::Stopped);
1082
1083        let gh_entry = &servers["github"];
1084        assert_eq!(gh_entry.config.command, "npx");
1085        assert!(gh_entry.config.disabled);
1086        assert_eq!(gh_entry.status, ServerStatus::Disabled);
1087    }
1088
1089    #[test]
1090    fn test_from_json_malformed() {
1091        let json = r#"{ this is not valid json }"#;
1092        let result = McpServerManager::from_json(json);
1093        let err = result.err().expect("should fail on malformed JSON");
1094        let err_msg = format!("{err}");
1095        assert!(
1096            err_msg.contains("failed to parse MCP server config"),
1097            "error message was: {err_msg}"
1098        );
1099    }
1100
1101    #[test]
1102    fn test_from_json_missing_command() {
1103        let json = r#"{
1104            "mcpServers": {
1105                "bad-server": {
1106                    "args": ["--flag"]
1107                }
1108            }
1109        }"#;
1110        let result = McpServerManager::from_json(json);
1111        let err = result.err().expect("should fail on missing command field");
1112        let err_msg = format!("{err}");
1113        assert!(
1114            err_msg.contains("failed to parse MCP server config"),
1115            "error message was: {err_msg}"
1116        );
1117    }
1118
1119    #[test]
1120    fn test_from_json_file_not_found() {
1121        let result = McpServerManager::from_json_file("/nonexistent/path/mcp.json");
1122        let err = result.err().expect("should fail on nonexistent file");
1123        let err_msg = format!("{err}");
1124        assert!(err_msg.contains("failed to read config file"), "error message was: {err_msg}");
1125        assert!(
1126            err_msg.contains("/nonexistent/path/mcp.json"),
1127            "error message should contain the path: {err_msg}"
1128        );
1129    }
1130
1131    #[test]
1132    fn test_mixed_disabled_and_enabled_servers() {
1133        let configs = HashMap::from([
1134            (
1135                "server-a".to_string(),
1136                McpServerConfig {
1137                    command: "cmd-a".to_string(),
1138                    args: vec![],
1139                    env: HashMap::new(),
1140                    disabled: false,
1141                    auto_approve: vec![],
1142                    restart_policy: None,
1143                },
1144            ),
1145            (
1146                "server-b".to_string(),
1147                McpServerConfig {
1148                    command: "cmd-b".to_string(),
1149                    args: vec![],
1150                    env: HashMap::new(),
1151                    disabled: true,
1152                    auto_approve: vec![],
1153                    restart_policy: None,
1154                },
1155            ),
1156            (
1157                "server-c".to_string(),
1158                McpServerConfig {
1159                    command: "cmd-c".to_string(),
1160                    args: vec![],
1161                    env: HashMap::new(),
1162                    disabled: false,
1163                    auto_approve: vec![],
1164                    restart_policy: None,
1165                },
1166            ),
1167        ]);
1168        let manager = McpServerManager::new(configs);
1169        let servers = manager.servers.try_read().unwrap();
1170        assert_eq!(servers["server-a"].status, ServerStatus::Stopped);
1171        assert_eq!(servers["server-b"].status, ServerStatus::Disabled);
1172        assert_eq!(servers["server-c"].status, ServerStatus::Stopped);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_add_server_success() {
1177        let manager = McpServerManager::new(HashMap::new());
1178        let config = McpServerConfig {
1179            command: "echo".to_string(),
1180            args: vec!["hello".to_string()],
1181            env: HashMap::new(),
1182            disabled: false,
1183            auto_approve: vec![],
1184            restart_policy: None,
1185        };
1186        let result = manager.add_server("new-server".to_string(), config).await;
1187        assert!(result.is_ok());
1188
1189        let status = manager.server_status("new-server").await.unwrap();
1190        assert_eq!(status, ServerStatus::Stopped);
1191    }
1192
1193    #[tokio::test]
1194    async fn test_add_server_duplicate_id() {
1195        let configs = HashMap::from([(
1196            "existing".to_string(),
1197            McpServerConfig {
1198                command: "echo".to_string(),
1199                args: vec![],
1200                env: HashMap::new(),
1201                disabled: false,
1202                auto_approve: vec![],
1203                restart_policy: None,
1204            },
1205        )]);
1206        let manager = McpServerManager::new(configs);
1207
1208        let config = McpServerConfig {
1209            command: "echo".to_string(),
1210            args: vec![],
1211            env: HashMap::new(),
1212            disabled: false,
1213            auto_approve: vec![],
1214            restart_policy: None,
1215        };
1216        let result = manager.add_server("existing".to_string(), config).await;
1217        assert!(result.is_err());
1218        let err_msg = format!("{}", result.unwrap_err());
1219        assert!(err_msg.contains("server ID 'existing' already exists"));
1220    }
1221
1222    #[tokio::test]
1223    async fn test_add_server_disabled() {
1224        let manager = McpServerManager::new(HashMap::new());
1225        let config = McpServerConfig {
1226            command: "echo".to_string(),
1227            args: vec![],
1228            env: HashMap::new(),
1229            disabled: true,
1230            auto_approve: vec![],
1231            restart_policy: None,
1232        };
1233        let result = manager.add_server("disabled-server".to_string(), config).await;
1234        assert!(result.is_ok());
1235
1236        let status = manager.server_status("disabled-server").await.unwrap();
1237        assert_eq!(status, ServerStatus::Disabled);
1238    }
1239
1240    #[tokio::test]
1241    async fn test_remove_server_success() {
1242        let configs = HashMap::from([(
1243            "to-remove".to_string(),
1244            McpServerConfig {
1245                command: "echo".to_string(),
1246                args: vec![],
1247                env: HashMap::new(),
1248                disabled: false,
1249                auto_approve: vec![],
1250                restart_policy: None,
1251            },
1252        )]);
1253        let manager = McpServerManager::new(configs);
1254
1255        // Verify it exists first
1256        assert!(manager.server_status("to-remove").await.is_ok());
1257
1258        // Remove it
1259        let result = manager.remove_server("to-remove").await;
1260        assert!(result.is_ok());
1261
1262        // Verify it no longer exists
1263        let status_result = manager.server_status("to-remove").await;
1264        assert!(status_result.is_err());
1265    }
1266
1267    #[tokio::test]
1268    async fn test_remove_server_unknown_id() {
1269        let manager = McpServerManager::new(HashMap::new());
1270        let result = manager.remove_server("nonexistent").await;
1271        assert!(result.is_err());
1272        let err_msg = format!("{}", result.unwrap_err());
1273        assert!(err_msg.contains("unknown server ID: 'nonexistent'"));
1274    }
1275
1276    #[tokio::test]
1277    async fn test_shutdown_sets_all_to_stopped() {
1278        // Create a manager with a mix of statuses
1279        let configs = HashMap::from([
1280            (
1281                "server-a".to_string(),
1282                McpServerConfig {
1283                    command: "echo".to_string(),
1284                    args: vec![],
1285                    env: HashMap::new(),
1286                    disabled: false,
1287                    auto_approve: vec![],
1288                    restart_policy: None,
1289                },
1290            ),
1291            (
1292                "server-b".to_string(),
1293                McpServerConfig {
1294                    command: "echo".to_string(),
1295                    args: vec![],
1296                    env: HashMap::new(),
1297                    disabled: true,
1298                    auto_approve: vec![],
1299                    restart_policy: None,
1300                },
1301            ),
1302            (
1303                "server-c".to_string(),
1304                McpServerConfig {
1305                    command: "echo".to_string(),
1306                    args: vec![],
1307                    env: HashMap::new(),
1308                    disabled: false,
1309                    auto_approve: vec![],
1310                    restart_policy: None,
1311                },
1312            ),
1313        ]);
1314        let manager = McpServerManager::new(configs);
1315
1316        // Manually set server-a to FailedToStart to test that shutdown
1317        // resets non-disabled statuses to Stopped
1318        {
1319            let mut servers = manager.servers.write().await;
1320            servers.get_mut("server-a").unwrap().status = ServerStatus::FailedToStart;
1321        }
1322
1323        // Call shutdown
1324        let result = manager.shutdown().await;
1325        assert!(result.is_ok());
1326
1327        // Verify all non-disabled servers are Stopped
1328        let statuses = manager.all_statuses().await;
1329        assert_eq!(statuses["server-a"], ServerStatus::Stopped);
1330        assert_eq!(statuses["server-b"], ServerStatus::Disabled); // Disabled stays Disabled
1331        assert_eq!(statuses["server-c"], ServerStatus::Stopped);
1332    }
1333
1334    #[tokio::test]
1335    async fn test_shutdown_stops_monitoring() {
1336        let manager = McpServerManager::new(HashMap::new());
1337
1338        // Start monitoring
1339        manager.start_monitoring();
1340
1341        // Shutdown should cancel the monitoring token
1342        let result = manager.shutdown().await;
1343        assert!(result.is_ok());
1344
1345        // Verify the cancellation token is cancelled
1346        assert!(manager.monitor_cancel.is_cancelled());
1347    }
1348
1349    #[tokio::test]
1350    async fn test_shutdown_empty_manager() {
1351        let manager = McpServerManager::new(HashMap::new());
1352        let result = manager.shutdown().await;
1353        assert!(result.is_ok());
1354    }
1355
1356    #[test]
1357    fn test_drop_no_warning_when_no_running_servers() {
1358        // This test verifies Drop doesn't panic when no servers are running.
1359        // The warning is only logged (not observable in test), but we verify
1360        // the Drop impl runs without error.
1361        let configs = HashMap::from([(
1362            "server-a".to_string(),
1363            McpServerConfig {
1364                command: "echo".to_string(),
1365                args: vec![],
1366                env: HashMap::new(),
1367                disabled: false,
1368                auto_approve: vec![],
1369                restart_policy: None,
1370            },
1371        )]);
1372        let manager = McpServerManager::new(configs);
1373        // Drop happens here — should not panic
1374        drop(manager);
1375    }
1376}