1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use adk_core::AdkError;
12use tokio::sync::RwLock;
13use tokio_util::sync::CancellationToken;
14
15use super::super::elicitation::{AutoDeclineElicitationHandler, ElicitationHandler};
16use super::super::toolset::McpToolset;
17use super::config::McpServerConfig;
18use super::entry::{BackoffState, McpServerEntry};
19use super::status::ServerStatus;
20
21#[allow(dead_code)] pub struct McpServerManager {
53 pub(crate) servers: Arc<RwLock<HashMap<String, McpServerEntry>>>,
55
56 pub(crate) elicitation_handler: Option<Arc<dyn ElicitationHandler>>,
58
59 #[cfg(feature = "mcp-sampling")]
62 pub(crate) sampling_handler: Option<Arc<dyn crate::sampling::SamplingHandler>>,
63
64 pub(crate) health_check_interval: Duration,
66
67 pub(crate) grace_period: Duration,
69
70 pub(crate) monitor_cancel: CancellationToken,
72
73 pub(crate) name: String,
75}
76
77impl McpServerManager {
78 pub fn new(configs: HashMap<String, McpServerConfig>) -> Self {
87 let servers: HashMap<String, McpServerEntry> = configs
88 .into_iter()
89 .map(|(id, config)| {
90 let status =
91 if config.disabled { ServerStatus::Disabled } else { ServerStatus::Stopped };
92 let backoff = BackoffState::new(&config.restart_policy);
93 let entry = McpServerEntry { config, status, toolset: None, child: None, backoff };
94 (id, entry)
95 })
96 .collect();
97
98 Self {
99 servers: Arc::new(RwLock::new(servers)),
100 elicitation_handler: None,
101 #[cfg(feature = "mcp-sampling")]
102 sampling_handler: None,
103 health_check_interval: Duration::from_secs(30),
104 grace_period: Duration::from_secs(5),
105 monitor_cancel: CancellationToken::new(),
106 name: "mcp_server_manager".to_string(),
107 }
108 }
109
110 pub fn from_json(json: &str) -> adk_core::Result<Self> {
134 let file: super::config::McpJsonFile = serde_json::from_str(json)
135 .map_err(|e| AdkError::tool(format!("failed to parse MCP server config: {e}")))?;
136 Ok(Self::new(file.mcp_servers))
137 }
138
139 pub fn from_json_file(path: impl AsRef<std::path::Path>) -> adk_core::Result<Self> {
154 let path = path.as_ref();
155 let content = std::fs::read_to_string(path).map_err(|e| {
156 AdkError::tool(format!("failed to read config file '{}': {e}", path.display()))
157 })?;
158 Self::from_json(&content)
159 }
160
161 pub fn with_elicitation_handler(mut self, handler: Arc<dyn ElicitationHandler>) -> Self {
165 self.elicitation_handler = Some(handler);
166 self
167 }
168
169 #[cfg(feature = "mcp-sampling")]
174 pub fn with_sampling_handler(
175 mut self,
176 handler: Arc<dyn crate::sampling::SamplingHandler>,
177 ) -> Self {
178 self.sampling_handler = Some(handler);
179 self
180 }
181
182 pub fn with_health_check_interval(mut self, interval: Duration) -> Self {
186 self.health_check_interval = interval;
187 self
188 }
189
190 pub fn with_grace_period(mut self, period: Duration) -> Self {
194 self.grace_period = period;
195 self
196 }
197
198 pub fn with_name(mut self, name: impl Into<String>) -> Self {
202 self.name = name.into();
203 self
204 }
205
206 pub async fn start_server(&self, id: &str) -> adk_core::Result<()> {
227 let mut servers = self.servers.write().await;
228 let entry = servers
229 .get_mut(id)
230 .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
231
232 Self::start_server_inner(
233 id,
234 entry,
235 &self.elicitation_handler,
236 #[cfg(feature = "mcp-sampling")]
237 &self.sampling_handler,
238 )
239 .await
240 }
241
242 async fn start_server_inner(
246 id: &str,
247 entry: &mut McpServerEntry,
248 elicitation_handler: &Option<Arc<dyn ElicitationHandler>>,
249 #[cfg(feature = "mcp-sampling")] sampling_handler: &Option<
250 Arc<dyn crate::sampling::SamplingHandler>,
251 >,
252 ) -> adk_core::Result<()> {
253 if entry.status == ServerStatus::Running {
255 return Ok(());
256 }
257
258 let config = &entry.config;
259
260 let mut cmd = tokio::process::Command::new(&config.command);
262 cmd.args(&config.args);
263 cmd.envs(&config.env);
264
265 let transport = rmcp::transport::TokioChildProcess::new(cmd).map_err(|e| {
267 entry.status = ServerStatus::FailedToStart;
268 AdkError::tool(format!(
269 "failed to spawn server '{id}': command '{}' not found. Verify it is installed and on PATH: {e}",
270 config.command
271 ))
272 })?;
273
274 let handler: Arc<dyn ElicitationHandler> =
276 elicitation_handler.clone().unwrap_or_else(|| Arc::new(AutoDeclineElicitationHandler));
277
278 #[cfg(feature = "mcp-sampling")]
279 let toolset_result = if let Some(sampling) = sampling_handler {
280 McpToolset::with_sampling_handler(transport, handler, Arc::clone(sampling)).await
281 } else {
282 McpToolset::with_elicitation_handler(transport, handler).await
283 };
284
285 #[cfg(not(feature = "mcp-sampling"))]
286 let toolset_result = McpToolset::with_elicitation_handler(transport, handler).await;
287
288 let toolset = toolset_result.map_err(|e| {
289 entry.status = ServerStatus::FailedToStart;
290 AdkError::tool(format!("MCP handshake failed for server '{id}': {e}"))
291 })?;
292
293 entry.status = ServerStatus::Running;
295 entry.toolset = Some(toolset);
296 entry.child = None; tracing::info!(
299 server.id = id,
300 server.command = config.command,
301 server.args = ?config.args,
302 "started MCP server"
303 );
304
305 Ok(())
306 }
307
308 pub async fn stop_server(&self, id: &str) -> adk_core::Result<()> {
325 let mut servers = self.servers.write().await;
326 let entry = servers
327 .get_mut(id)
328 .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
329
330 Self::stop_server_inner(id, entry, "manual").await;
331 Ok(())
332 }
333
334 async fn stop_server_inner(id: &str, entry: &mut McpServerEntry, reason: &str) {
338 if entry.status != ServerStatus::Running && entry.status != ServerStatus::Restarting {
340 return;
341 }
342
343 if let Some(ref toolset) = entry.toolset {
345 let cancel_token = toolset.cancellation_token().await;
346 cancel_token.cancel();
347 }
348
349 entry.toolset = None;
351 entry.child = None;
352
353 if entry.status != ServerStatus::Restarting {
355 entry.status = ServerStatus::Stopped;
356 }
357
358 tracing::info!(server.id = id, stop.reason = reason, "stopped MCP server");
359 }
360
361 pub async fn restart_server(&self, id: &str) -> adk_core::Result<()> {
379 let mut servers = self.servers.write().await;
380 let entry = servers
381 .get_mut(id)
382 .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
383
384 entry.status = ServerStatus::Restarting;
386
387 Self::stop_server_inner(id, entry, "restart").await;
389
390 Self::start_server_inner(
392 id,
393 entry,
394 &self.elicitation_handler,
395 #[cfg(feature = "mcp-sampling")]
396 &self.sampling_handler,
397 )
398 .await
399 }
400
401 pub async fn server_status(&self, id: &str) -> adk_core::Result<ServerStatus> {
414 let servers = self.servers.read().await;
415 servers
416 .get(id)
417 .map(|entry| entry.status)
418 .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))
419 }
420
421 pub async fn all_statuses(&self) -> HashMap<String, ServerStatus> {
432 let servers = self.servers.read().await;
433 servers.iter().map(|(id, entry)| (id.clone(), entry.status)).collect()
434 }
435
436 pub async fn running_server_count(&self) -> usize {
445 let servers = self.servers.read().await;
446 servers.values().filter(|entry| entry.status == ServerStatus::Running).count()
447 }
448
449 pub fn start_monitoring(&self) {
469 let servers = Arc::clone(&self.servers);
470 let cancel = self.monitor_cancel.clone();
471 let interval = self.health_check_interval;
472 let elicitation_handler = self.elicitation_handler.clone();
473 #[cfg(feature = "mcp-sampling")]
474 let sampling_handler = self.sampling_handler.clone();
475
476 tokio::spawn(async move {
477 loop {
478 tokio::select! {
479 _ = cancel.cancelled() => {
480 tracing::info!("health monitor stopped");
481 break;
482 }
483 _ = tokio::time::sleep(interval) => {
484 let crashed_ids: Vec<String> = {
486 let servers = servers.read().await;
487 let mut crashed = Vec::new();
488 for (id, entry) in servers.iter() {
489 if entry.status != ServerStatus::Running {
490 continue;
491 }
492 if let Some(ref toolset) = entry.toolset {
493 if toolset.is_closed().await {
494 crashed.push(id.clone());
495 }
496 } else {
497 crashed.push(id.clone());
499 }
500 }
501 crashed
502 };
503
504 if crashed_ids.is_empty() {
505 continue;
506 }
507
508 for id in crashed_ids {
510 let restart_info = {
512 let mut servers = servers.write().await;
513 if let Some(entry) = servers.get_mut(&id) {
514 if entry.status != ServerStatus::Running {
517 continue;
518 }
519
520 tracing::warn!(
521 server.id = id,
522 failure.reason = "connection closed",
523 "health check failed"
524 );
525
526 entry.status = ServerStatus::Crashed;
527 entry.toolset = None;
528 entry.child = None;
529
530 entry.config.restart_policy.clone()
532 } else {
533 continue;
534 }
535 };
536
537 if let Some(ref policy) = restart_info {
539 let exceeded = {
541 let servers = servers.read().await;
542 servers.get(&id)
543 .map(|e| e.backoff.exceeded_max_attempts(policy))
544 .unwrap_or(true)
545 };
546
547 if exceeded {
548 let mut servers = servers.write().await;
549 if let Some(entry) = servers.get_mut(&id) {
550 tracing::error!(
551 server.id = id,
552 restart.total_attempts = entry.backoff.consecutive_failures,
553 "max restart attempts exceeded, giving up"
554 );
555 entry.status = ServerStatus::FailedToStart;
556 }
557 continue;
558 }
559
560 let (delay_ms, attempt) = {
562 let mut servers = servers.write().await;
563 if let Some(entry) = servers.get_mut(&id) {
564 let attempt = entry.backoff.consecutive_failures + 1;
565 let delay = entry.backoff.next_delay(policy);
566 (delay, attempt)
567 } else {
568 continue;
569 }
570 };
571
572 tracing::info!(
573 server.id = id,
574 restart.attempt = attempt,
575 restart.delay_ms = delay_ms,
576 "auto-restarting crashed server after backoff"
577 );
578
579 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
581
582 if cancel.is_cancelled() {
584 break;
585 }
586
587 let restart_result = {
589 let mut servers = servers.write().await;
590 if let Some(entry) = servers.get_mut(&id) {
591 entry.status = ServerStatus::Restarting;
592 Self::start_server_inner(
593 &id,
594 entry,
595 &elicitation_handler,
596 #[cfg(feature = "mcp-sampling")]
597 &sampling_handler,
598 )
599 .await
600 } else {
601 continue;
602 }
603 };
604
605 match restart_result {
606 Ok(()) => {
607 let mut servers = servers.write().await;
609 if let Some(entry) = servers.get_mut(&id) {
610 entry.backoff.reset(policy);
611 tracing::info!(
612 server.id = id,
613 "auto-restart succeeded"
614 );
615 }
616 }
617 Err(e) => {
618 tracing::warn!(
619 server.id = id,
620 error = %e,
621 "auto-restart failed"
622 );
623 }
625 }
626 }
627 }
628 }
629 }
630 }
631 });
632 }
633
634 pub fn stop_monitoring(&self) {
645 self.monitor_cancel.cancel();
646 }
647
648 pub async fn add_server(&self, id: String, config: McpServerConfig) -> adk_core::Result<()> {
670 let mut servers = self.servers.write().await;
671 if servers.contains_key(&id) {
672 return Err(AdkError::tool(format!("server ID '{id}' already exists")));
673 }
674 let status = if config.disabled { ServerStatus::Disabled } else { ServerStatus::Stopped };
675 let backoff = BackoffState::new(&config.restart_policy);
676 let entry = McpServerEntry { config, status, toolset: None, child: None, backoff };
677 servers.insert(id, entry);
678 Ok(())
679 }
680
681 pub async fn remove_server(&self, id: &str) -> adk_core::Result<()> {
696 let mut servers = self.servers.write().await;
697 let entry = servers
698 .get_mut(id)
699 .ok_or_else(|| AdkError::tool(format!("unknown server ID: '{id}'")))?;
700
701 Self::stop_server_inner(id, entry, "removal").await;
703
704 servers.remove(id);
705 Ok(())
706 }
707
708 pub async fn start_all(&self) -> HashMap<String, adk_core::Result<()>> {
731 let ids_to_start: Vec<String> = {
733 let servers = self.servers.read().await;
734 servers
735 .iter()
736 .filter(|(_, entry)| !entry.config.disabled)
737 .map(|(id, _)| id.clone())
738 .collect()
739 };
740
741 let futures: Vec<_> = ids_to_start
744 .iter()
745 .map(|id| {
746 let id = id.clone();
747 async move {
748 let result = self.start_server(&id).await;
749 if let Err(ref e) = result {
750 tracing::error!(
751 server.id = id,
752 error = %e,
753 "failed to start server during start_all"
754 );
755 }
756 (id, result)
757 }
758 })
759 .collect();
760
761 futures::future::join_all(futures).await.into_iter().collect()
762 }
763
764 pub async fn shutdown(&self) -> adk_core::Result<()> {
778 self.stop_monitoring();
780
781 let mut servers = self.servers.write().await;
783 let ids: Vec<String> = servers
784 .iter()
785 .filter(|(_, entry)| entry.status == ServerStatus::Running)
786 .map(|(id, _)| id.clone())
787 .collect();
788
789 for id in &ids {
790 if let Some(entry) = servers.get_mut(id) {
791 Self::stop_server_inner(id, entry, "shutdown").await;
792 }
793 }
794
795 for entry in servers.values_mut() {
797 if entry.status != ServerStatus::Disabled {
798 entry.status = ServerStatus::Stopped;
799 }
800 }
801
802 Ok(())
803 }
804}
805
806impl Drop for McpServerManager {
807 fn drop(&mut self) {
808 if let Ok(servers) = self.servers.try_read() {
810 let running = servers.values().filter(|e| e.status == ServerStatus::Running).count();
811 if running > 0 {
812 tracing::warn!(
813 running_count = running,
814 "McpServerManager dropped with {running} servers still running. \
815 Call shutdown() before dropping to ensure clean process cleanup."
816 );
817 }
818 }
819 }
820}
821
822const _: () = {
825 fn _assert_send<T: Send>() {}
826 fn _assert_sync<T: Sync>() {}
827 fn _assert_send_sync() {
828 _assert_send::<McpServerManager>();
829 _assert_sync::<McpServerManager>();
830 }
831};
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836
837 #[test]
838 fn test_new_empty_configs() {
839 let manager = McpServerManager::new(HashMap::new());
840 assert_eq!(manager.name, "mcp_server_manager");
841 assert_eq!(manager.health_check_interval, Duration::from_secs(30));
842 assert_eq!(manager.grace_period, Duration::from_secs(5));
843 assert!(manager.elicitation_handler.is_none());
844 }
845
846 #[test]
847 fn test_new_disabled_server_gets_disabled_status() {
848 let configs = HashMap::from([(
849 "disabled-server".to_string(),
850 McpServerConfig {
851 command: "echo".to_string(),
852 args: vec![],
853 env: HashMap::new(),
854 disabled: true,
855 auto_approve: vec![],
856 restart_policy: None,
857 },
858 )]);
859 let manager = McpServerManager::new(configs);
860 let servers = manager.servers.try_read().unwrap();
861 assert_eq!(servers["disabled-server"].status, ServerStatus::Disabled);
862 }
863
864 #[test]
865 fn test_new_enabled_server_gets_stopped_status() {
866 let configs = HashMap::from([(
867 "enabled-server".to_string(),
868 McpServerConfig {
869 command: "echo".to_string(),
870 args: vec![],
871 env: HashMap::new(),
872 disabled: false,
873 auto_approve: vec![],
874 restart_policy: None,
875 },
876 )]);
877 let manager = McpServerManager::new(configs);
878 let servers = manager.servers.try_read().unwrap();
879 assert_eq!(servers["enabled-server"].status, ServerStatus::Stopped);
880 }
881
882 #[test]
883 fn test_builder_with_name() {
884 let manager = McpServerManager::new(HashMap::new()).with_name("custom_name");
885 assert_eq!(manager.name, "custom_name");
886 }
887
888 #[test]
889 fn test_builder_with_health_check_interval() {
890 let manager = McpServerManager::new(HashMap::new())
891 .with_health_check_interval(Duration::from_secs(10));
892 assert_eq!(manager.health_check_interval, Duration::from_secs(10));
893 }
894
895 #[test]
896 fn test_builder_with_grace_period() {
897 let manager =
898 McpServerManager::new(HashMap::new()).with_grace_period(Duration::from_secs(2));
899 assert_eq!(manager.grace_period, Duration::from_secs(2));
900 }
901
902 #[test]
903 fn test_builder_with_elicitation_handler() {
904 use super::super::super::elicitation::AutoDeclineElicitationHandler;
905 let handler: Arc<dyn ElicitationHandler> = Arc::new(AutoDeclineElicitationHandler);
906 let manager = McpServerManager::new(HashMap::new()).with_elicitation_handler(handler);
907 assert!(manager.elicitation_handler.is_some());
908 }
909
910 #[tokio::test]
911 async fn test_server_status_returns_correct_status() {
912 let configs = HashMap::from([(
913 "server-a".to_string(),
914 McpServerConfig {
915 command: "echo".to_string(),
916 args: vec![],
917 env: HashMap::new(),
918 disabled: false,
919 auto_approve: vec![],
920 restart_policy: None,
921 },
922 )]);
923 let manager = McpServerManager::new(configs);
924 let status = manager.server_status("server-a").await.unwrap();
925 assert_eq!(status, ServerStatus::Stopped);
926 }
927
928 #[tokio::test]
929 async fn test_server_status_unknown_id_returns_error() {
930 let manager = McpServerManager::new(HashMap::new());
931 let result = manager.server_status("nonexistent").await;
932 assert!(result.is_err());
933 let err_msg = format!("{}", result.unwrap_err());
934 assert!(err_msg.contains("unknown server ID: 'nonexistent'"));
935 }
936
937 #[tokio::test]
938 async fn test_all_statuses_returns_all_servers() {
939 let configs = HashMap::from([
940 (
941 "server-a".to_string(),
942 McpServerConfig {
943 command: "echo".to_string(),
944 args: vec![],
945 env: HashMap::new(),
946 disabled: false,
947 auto_approve: vec![],
948 restart_policy: None,
949 },
950 ),
951 (
952 "server-b".to_string(),
953 McpServerConfig {
954 command: "echo".to_string(),
955 args: vec![],
956 env: HashMap::new(),
957 disabled: true,
958 auto_approve: vec![],
959 restart_policy: None,
960 },
961 ),
962 ]);
963 let manager = McpServerManager::new(configs);
964 let statuses = manager.all_statuses().await;
965 assert_eq!(statuses.len(), 2);
966 assert_eq!(statuses["server-a"], ServerStatus::Stopped);
967 assert_eq!(statuses["server-b"], ServerStatus::Disabled);
968 }
969
970 #[tokio::test]
971 async fn test_all_statuses_empty_manager() {
972 let manager = McpServerManager::new(HashMap::new());
973 let statuses = manager.all_statuses().await;
974 assert!(statuses.is_empty());
975 }
976
977 #[tokio::test]
978 async fn test_running_server_count_no_running() {
979 let configs = HashMap::from([(
980 "server-a".to_string(),
981 McpServerConfig {
982 command: "echo".to_string(),
983 args: vec![],
984 env: HashMap::new(),
985 disabled: false,
986 auto_approve: vec![],
987 restart_policy: None,
988 },
989 )]);
990 let manager = McpServerManager::new(configs);
991 assert_eq!(manager.running_server_count().await, 0);
992 }
993
994 #[tokio::test]
995 async fn test_running_server_count_empty_manager() {
996 let manager = McpServerManager::new(HashMap::new());
997 assert_eq!(manager.running_server_count().await, 0);
998 }
999
1000 #[tokio::test]
1001 async fn test_start_all_skips_disabled_servers() {
1002 let configs = HashMap::from([
1003 (
1004 "enabled".to_string(),
1005 McpServerConfig {
1006 command: "nonexistent-command-xyz".to_string(),
1007 args: vec![],
1008 env: HashMap::new(),
1009 disabled: false,
1010 auto_approve: vec![],
1011 restart_policy: None,
1012 },
1013 ),
1014 (
1015 "disabled".to_string(),
1016 McpServerConfig {
1017 command: "echo".to_string(),
1018 args: vec![],
1019 env: HashMap::new(),
1020 disabled: true,
1021 auto_approve: vec![],
1022 restart_policy: None,
1023 },
1024 ),
1025 ]);
1026 let manager = McpServerManager::new(configs);
1027 let results = manager.start_all().await;
1028
1029 assert!(results.contains_key("enabled"));
1031 assert!(!results.contains_key("disabled"));
1032
1033 assert!(results["enabled"].is_err());
1035
1036 let status = manager.server_status("disabled").await.unwrap();
1038 assert_eq!(status, ServerStatus::Disabled);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_start_all_empty_manager() {
1043 let manager = McpServerManager::new(HashMap::new());
1044 let results = manager.start_all().await;
1045 assert!(results.is_empty());
1046 }
1047
1048 #[test]
1049 fn test_from_json_valid() {
1050 let json = r#"{
1051 "mcpServers": {
1052 "filesystem": {
1053 "command": "npx",
1054 "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
1055 "env": { "NODE_ENV": "production" },
1056 "disabled": false,
1057 "autoApprove": ["read_file", "list_directory"]
1058 },
1059 "github": {
1060 "command": "npx",
1061 "args": ["-y", "@modelcontextprotocol/server-github"],
1062 "env": { "GITHUB_TOKEN": "ghp_xxx" },
1063 "disabled": true,
1064 "autoApprove": []
1065 }
1066 }
1067 }"#;
1068 let manager = McpServerManager::from_json(json).unwrap();
1069 let servers = manager.servers.try_read().unwrap();
1070 assert_eq!(servers.len(), 2);
1071
1072 let fs_entry = &servers["filesystem"];
1073 assert_eq!(fs_entry.config.command, "npx");
1074 assert_eq!(
1075 fs_entry.config.args,
1076 vec!["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
1077 );
1078 assert_eq!(fs_entry.config.env["NODE_ENV"], "production");
1079 assert!(!fs_entry.config.disabled);
1080 assert_eq!(fs_entry.config.auto_approve, vec!["read_file", "list_directory"]);
1081 assert_eq!(fs_entry.status, ServerStatus::Stopped);
1082
1083 let gh_entry = &servers["github"];
1084 assert_eq!(gh_entry.config.command, "npx");
1085 assert!(gh_entry.config.disabled);
1086 assert_eq!(gh_entry.status, ServerStatus::Disabled);
1087 }
1088
1089 #[test]
1090 fn test_from_json_malformed() {
1091 let json = r#"{ this is not valid json }"#;
1092 let result = McpServerManager::from_json(json);
1093 let err = result.err().expect("should fail on malformed JSON");
1094 let err_msg = format!("{err}");
1095 assert!(
1096 err_msg.contains("failed to parse MCP server config"),
1097 "error message was: {err_msg}"
1098 );
1099 }
1100
1101 #[test]
1102 fn test_from_json_missing_command() {
1103 let json = r#"{
1104 "mcpServers": {
1105 "bad-server": {
1106 "args": ["--flag"]
1107 }
1108 }
1109 }"#;
1110 let result = McpServerManager::from_json(json);
1111 let err = result.err().expect("should fail on missing command field");
1112 let err_msg = format!("{err}");
1113 assert!(
1114 err_msg.contains("failed to parse MCP server config"),
1115 "error message was: {err_msg}"
1116 );
1117 }
1118
1119 #[test]
1120 fn test_from_json_file_not_found() {
1121 let result = McpServerManager::from_json_file("/nonexistent/path/mcp.json");
1122 let err = result.err().expect("should fail on nonexistent file");
1123 let err_msg = format!("{err}");
1124 assert!(err_msg.contains("failed to read config file"), "error message was: {err_msg}");
1125 assert!(
1126 err_msg.contains("/nonexistent/path/mcp.json"),
1127 "error message should contain the path: {err_msg}"
1128 );
1129 }
1130
1131 #[test]
1132 fn test_mixed_disabled_and_enabled_servers() {
1133 let configs = HashMap::from([
1134 (
1135 "server-a".to_string(),
1136 McpServerConfig {
1137 command: "cmd-a".to_string(),
1138 args: vec![],
1139 env: HashMap::new(),
1140 disabled: false,
1141 auto_approve: vec![],
1142 restart_policy: None,
1143 },
1144 ),
1145 (
1146 "server-b".to_string(),
1147 McpServerConfig {
1148 command: "cmd-b".to_string(),
1149 args: vec![],
1150 env: HashMap::new(),
1151 disabled: true,
1152 auto_approve: vec![],
1153 restart_policy: None,
1154 },
1155 ),
1156 (
1157 "server-c".to_string(),
1158 McpServerConfig {
1159 command: "cmd-c".to_string(),
1160 args: vec![],
1161 env: HashMap::new(),
1162 disabled: false,
1163 auto_approve: vec![],
1164 restart_policy: None,
1165 },
1166 ),
1167 ]);
1168 let manager = McpServerManager::new(configs);
1169 let servers = manager.servers.try_read().unwrap();
1170 assert_eq!(servers["server-a"].status, ServerStatus::Stopped);
1171 assert_eq!(servers["server-b"].status, ServerStatus::Disabled);
1172 assert_eq!(servers["server-c"].status, ServerStatus::Stopped);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_add_server_success() {
1177 let manager = McpServerManager::new(HashMap::new());
1178 let config = McpServerConfig {
1179 command: "echo".to_string(),
1180 args: vec!["hello".to_string()],
1181 env: HashMap::new(),
1182 disabled: false,
1183 auto_approve: vec![],
1184 restart_policy: None,
1185 };
1186 let result = manager.add_server("new-server".to_string(), config).await;
1187 assert!(result.is_ok());
1188
1189 let status = manager.server_status("new-server").await.unwrap();
1190 assert_eq!(status, ServerStatus::Stopped);
1191 }
1192
1193 #[tokio::test]
1194 async fn test_add_server_duplicate_id() {
1195 let configs = HashMap::from([(
1196 "existing".to_string(),
1197 McpServerConfig {
1198 command: "echo".to_string(),
1199 args: vec![],
1200 env: HashMap::new(),
1201 disabled: false,
1202 auto_approve: vec![],
1203 restart_policy: None,
1204 },
1205 )]);
1206 let manager = McpServerManager::new(configs);
1207
1208 let config = McpServerConfig {
1209 command: "echo".to_string(),
1210 args: vec![],
1211 env: HashMap::new(),
1212 disabled: false,
1213 auto_approve: vec![],
1214 restart_policy: None,
1215 };
1216 let result = manager.add_server("existing".to_string(), config).await;
1217 assert!(result.is_err());
1218 let err_msg = format!("{}", result.unwrap_err());
1219 assert!(err_msg.contains("server ID 'existing' already exists"));
1220 }
1221
1222 #[tokio::test]
1223 async fn test_add_server_disabled() {
1224 let manager = McpServerManager::new(HashMap::new());
1225 let config = McpServerConfig {
1226 command: "echo".to_string(),
1227 args: vec![],
1228 env: HashMap::new(),
1229 disabled: true,
1230 auto_approve: vec![],
1231 restart_policy: None,
1232 };
1233 let result = manager.add_server("disabled-server".to_string(), config).await;
1234 assert!(result.is_ok());
1235
1236 let status = manager.server_status("disabled-server").await.unwrap();
1237 assert_eq!(status, ServerStatus::Disabled);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_remove_server_success() {
1242 let configs = HashMap::from([(
1243 "to-remove".to_string(),
1244 McpServerConfig {
1245 command: "echo".to_string(),
1246 args: vec![],
1247 env: HashMap::new(),
1248 disabled: false,
1249 auto_approve: vec![],
1250 restart_policy: None,
1251 },
1252 )]);
1253 let manager = McpServerManager::new(configs);
1254
1255 assert!(manager.server_status("to-remove").await.is_ok());
1257
1258 let result = manager.remove_server("to-remove").await;
1260 assert!(result.is_ok());
1261
1262 let status_result = manager.server_status("to-remove").await;
1264 assert!(status_result.is_err());
1265 }
1266
1267 #[tokio::test]
1268 async fn test_remove_server_unknown_id() {
1269 let manager = McpServerManager::new(HashMap::new());
1270 let result = manager.remove_server("nonexistent").await;
1271 assert!(result.is_err());
1272 let err_msg = format!("{}", result.unwrap_err());
1273 assert!(err_msg.contains("unknown server ID: 'nonexistent'"));
1274 }
1275
1276 #[tokio::test]
1277 async fn test_shutdown_sets_all_to_stopped() {
1278 let configs = HashMap::from([
1280 (
1281 "server-a".to_string(),
1282 McpServerConfig {
1283 command: "echo".to_string(),
1284 args: vec![],
1285 env: HashMap::new(),
1286 disabled: false,
1287 auto_approve: vec![],
1288 restart_policy: None,
1289 },
1290 ),
1291 (
1292 "server-b".to_string(),
1293 McpServerConfig {
1294 command: "echo".to_string(),
1295 args: vec![],
1296 env: HashMap::new(),
1297 disabled: true,
1298 auto_approve: vec![],
1299 restart_policy: None,
1300 },
1301 ),
1302 (
1303 "server-c".to_string(),
1304 McpServerConfig {
1305 command: "echo".to_string(),
1306 args: vec![],
1307 env: HashMap::new(),
1308 disabled: false,
1309 auto_approve: vec![],
1310 restart_policy: None,
1311 },
1312 ),
1313 ]);
1314 let manager = McpServerManager::new(configs);
1315
1316 {
1319 let mut servers = manager.servers.write().await;
1320 servers.get_mut("server-a").unwrap().status = ServerStatus::FailedToStart;
1321 }
1322
1323 let result = manager.shutdown().await;
1325 assert!(result.is_ok());
1326
1327 let statuses = manager.all_statuses().await;
1329 assert_eq!(statuses["server-a"], ServerStatus::Stopped);
1330 assert_eq!(statuses["server-b"], ServerStatus::Disabled); assert_eq!(statuses["server-c"], ServerStatus::Stopped);
1332 }
1333
1334 #[tokio::test]
1335 async fn test_shutdown_stops_monitoring() {
1336 let manager = McpServerManager::new(HashMap::new());
1337
1338 manager.start_monitoring();
1340
1341 let result = manager.shutdown().await;
1343 assert!(result.is_ok());
1344
1345 assert!(manager.monitor_cancel.is_cancelled());
1347 }
1348
1349 #[tokio::test]
1350 async fn test_shutdown_empty_manager() {
1351 let manager = McpServerManager::new(HashMap::new());
1352 let result = manager.shutdown().await;
1353 assert!(result.is_ok());
1354 }
1355
1356 #[test]
1357 fn test_drop_no_warning_when_no_running_servers() {
1358 let configs = HashMap::from([(
1362 "server-a".to_string(),
1363 McpServerConfig {
1364 command: "echo".to_string(),
1365 args: vec![],
1366 env: HashMap::new(),
1367 disabled: false,
1368 auto_approve: vec![],
1369 restart_policy: None,
1370 },
1371 )]);
1372 let manager = McpServerManager::new(configs);
1373 drop(manager);
1375 }
1376}