1use async_trait::async_trait;
27use chrono::Utc;
28use std::collections::HashMap;
29use std::process::Stdio;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::io::{AsyncBufReadExt, BufReader};
33use tokio::process::{Child, Command};
34use tokio::sync::{mpsc, Mutex, RwLock};
35
36use crate::mcp::error::{McpError, McpResult};
37use crate::mcp::types::{
38 HealthCheckResult, LifecycleOptions, McpServerConfig, ServerProcess, ServerState, TransportType,
39};
40
41#[derive(Debug, Clone)]
43pub enum LifecycleEvent {
44 Starting { server_name: String },
46 Started {
48 server_name: String,
49 pid: Option<u32>,
50 },
51 Stopping {
53 server_name: String,
54 reason: Option<String>,
55 },
56 Stopped { server_name: String },
58 Error { server_name: String, error: String },
60 Crashed {
62 server_name: String,
63 exit_code: Option<i32>,
64 },
65 Restarting { server_name: String },
67 HealthOk {
69 server_name: String,
70 result: HealthCheckResult,
71 },
72 HealthFailed {
74 server_name: String,
75 result: HealthCheckResult,
76 },
77 Stdout { server_name: String, data: String },
79 Stderr { server_name: String, data: String },
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct StartOptions {
86 pub force: bool,
88 pub wait_for_ready: bool,
90 pub dependencies: Vec<String>,
92}
93
94#[derive(Debug, Clone, Default)]
96pub struct StopOptions {
97 pub force: bool,
99 pub reason: Option<String>,
101}
102
103pub(crate) struct ManagedServer {
105 process: ServerProcess,
107 config: McpServerConfig,
109 child: Option<Child>,
111 dependencies: Vec<String>,
113 output_handles: Vec<tokio::task::JoinHandle<()>>,
115 health_check_handle: Option<tokio::task::JoinHandle<()>>,
117 restart_handle: Option<tokio::task::JoinHandle<()>>,
119}
120
121impl ManagedServer {
122 fn new(name: String, config: McpServerConfig) -> Self {
123 Self {
124 process: ServerProcess::new(name),
125 config,
126 child: None,
127 dependencies: Vec::new(),
128 output_handles: Vec::new(),
129 health_check_handle: None,
130 restart_handle: None,
131 }
132 }
133}
134
135#[async_trait]
139pub trait LifecycleManager: Send + Sync {
140 fn register_server(&self, name: &str, config: McpServerConfig);
142
143 async fn unregister_server(&self, name: &str) -> McpResult<()>;
145
146 fn set_dependencies(&self, name: &str, dependencies: Vec<String>);
148
149 async fn start(&self, server_name: &str, options: Option<StartOptions>) -> McpResult<()>;
151
152 async fn start_all(&self) -> McpResult<()>;
154
155 async fn start_with_dependencies(&self, server_name: &str) -> McpResult<()>;
157
158 async fn stop(&self, server_name: &str, options: Option<StopOptions>) -> McpResult<()>;
160
161 async fn stop_all(&self, force: bool) -> McpResult<()>;
163
164 async fn restart(&self, server_name: &str) -> McpResult<()>;
166
167 async fn restart_all(&self) -> McpResult<()>;
169
170 async fn health_check(&self, server_name: &str) -> HealthCheckResult;
172
173 async fn health_check_all(&self) -> HashMap<String, HealthCheckResult>;
175
176 fn get_state(&self, server_name: &str) -> ServerState;
178
179 fn get_process(&self, server_name: &str) -> Option<ServerProcess>;
181
182 fn get_all_processes(&self) -> Vec<ServerProcess>;
184
185 fn is_running(&self, server_name: &str) -> bool;
187
188 fn get_running_servers(&self) -> Vec<String>;
190
191 fn subscribe(&self) -> mpsc::Receiver<LifecycleEvent>;
193
194 async fn cleanup(&self) -> McpResult<()>;
196}
197
198pub struct McpLifecycleManager {
200 pub(crate) servers: Arc<RwLock<HashMap<String, ManagedServer>>>,
202 pub options: LifecycleOptions,
204 event_tx: Arc<Mutex<Option<mpsc::Sender<LifecycleEvent>>>>,
206 enable_auto_restart: bool,
208 enable_health_checks: bool,
210}
211
212impl McpLifecycleManager {
213 pub fn new() -> Self {
215 Self::with_options(LifecycleOptions::default())
216 }
217
218 pub fn with_options(options: LifecycleOptions) -> Self {
220 Self {
221 servers: Arc::new(RwLock::new(HashMap::new())),
222 options,
223 event_tx: Arc::new(Mutex::new(None)),
224 enable_auto_restart: true,
225 enable_health_checks: true,
226 }
227 }
228
229 pub fn set_auto_restart_enabled(&mut self, enabled: bool) {
231 self.enable_auto_restart = enabled;
232 }
233
234 pub fn set_health_checks_enabled(&mut self, enabled: bool) {
236 self.enable_health_checks = enabled;
237 }
238
239 async fn emit_event(&self, event: LifecycleEvent) {
241 if let Some(tx) = self.event_tx.lock().await.as_ref() {
242 let _ = tx.send(event).await;
243 }
244 }
245
246 pub fn calculate_restart_delay(&self, attempt: u32) -> Duration {
248 let base = self.options.restart_delay.as_millis() as u64;
249 let max_delay_ms = 60_000u64;
251 let delay_ms = base.saturating_mul(1u64 << attempt.min(10));
252 Duration::from_millis(delay_ms.min(max_delay_ms))
253 }
254
255 fn start_output_capture(
257 &self,
258 server_name: String,
259 child: &mut Child,
260 ) -> Vec<tokio::task::JoinHandle<()>> {
261 let mut handles = Vec::new();
262 let event_tx = self.event_tx.clone();
263
264 if let Some(stdout) = child.stdout.take() {
266 let name = server_name.clone();
267 let tx = event_tx.clone();
268 let handle = tokio::spawn(async move {
269 let reader = BufReader::new(stdout);
270 let mut lines = reader.lines();
271 while let Ok(Some(line)) = lines.next_line().await {
272 if let Some(sender) = tx.lock().await.as_ref() {
273 let _ = sender
274 .send(LifecycleEvent::Stdout {
275 server_name: name.clone(),
276 data: line,
277 })
278 .await;
279 }
280 }
281 });
282 handles.push(handle);
283 }
284
285 if let Some(stderr) = child.stderr.take() {
287 let name = server_name;
288 let tx = event_tx;
289 let handle = tokio::spawn(async move {
290 let reader = BufReader::new(stderr);
291 let mut lines = reader.lines();
292 while let Ok(Some(line)) = lines.next_line().await {
293 if let Some(sender) = tx.lock().await.as_ref() {
294 let _ = sender
295 .send(LifecycleEvent::Stderr {
296 server_name: name.clone(),
297 data: line,
298 })
299 .await;
300 }
301 }
302 });
303 handles.push(handle);
304 }
305
306 handles
307 }
308
309 fn start_exit_monitor(&self, server_name: String) {
311 if !self.enable_auto_restart {
312 return;
313 }
314
315 let servers = self.servers.clone();
316 let event_tx = self.event_tx.clone();
317 let options = self.options.clone();
318 let enable_auto_restart = self.enable_auto_restart;
319
320 tokio::spawn(async move {
321 loop {
322 let should_restart = {
324 let mut servers_guard = servers.write().await;
325 if let Some(server) = servers_guard.get_mut(&server_name) {
326 if let Some(ref mut child) = server.child {
327 match child.try_wait() {
328 Ok(Some(status)) => {
329 let exit_code = status.code();
331 server.process.state = ServerState::Crashed;
332 server.process.stopped_at = Some(Utc::now());
333 server.process.consecutive_failures += 1;
334 server.child = None;
335
336 if let Some(tx) = event_tx.lock().await.as_ref() {
338 let _ = tx
339 .send(LifecycleEvent::Crashed {
340 server_name: server_name.clone(),
341 exit_code,
342 })
343 .await;
344 }
345
346 if enable_auto_restart
348 && server.process.restart_count < options.max_restarts
349 {
350 true
351 } else {
352 server.process.state = ServerState::Crashed;
353 server.process.last_error = Some(format!(
354 "Process exited with code {:?}, max restarts exceeded",
355 exit_code
356 ));
357 false
358 }
359 }
360 Ok(None) => {
361 false
363 }
364 Err(e) => {
365 server.process.last_error = Some(e.to_string());
367 false
368 }
369 }
370 } else {
371 break;
373 }
374 } else {
375 break;
377 }
378 };
379
380 if should_restart {
381 let restart_count = {
383 let servers_guard = servers.read().await;
384 servers_guard
385 .get(&server_name)
386 .map(|s| s.process.restart_count)
387 .unwrap_or(0)
388 };
389
390 let base = options.restart_delay.as_millis() as u64;
391 let delay_ms = base.saturating_mul(1u64 << restart_count.min(10));
392 let delay = Duration::from_millis(delay_ms.min(60_000));
393
394 if let Some(tx) = event_tx.lock().await.as_ref() {
396 let _ = tx
397 .send(LifecycleEvent::Restarting {
398 server_name: server_name.clone(),
399 })
400 .await;
401 }
402
403 tokio::time::sleep(delay).await;
404
405 let mut servers_guard = servers.write().await;
407 if let Some(server) = servers_guard.get_mut(&server_name) {
408 server.process.restart_count += 1;
409 server.process.state = ServerState::Starting;
412 }
413 }
414
415 tokio::time::sleep(Duration::from_millis(500)).await;
417 }
418 });
419 }
420
421 fn start_health_check_monitor(&self, server_name: String) -> tokio::task::JoinHandle<()> {
423 let servers = self.servers.clone();
424 let event_tx = self.event_tx.clone();
425 let interval = self.options.health_check_interval;
426
427 tokio::spawn(async move {
428 let mut interval_timer = tokio::time::interval(interval);
429
430 loop {
431 interval_timer.tick().await;
432
433 let is_running = {
434 let servers_guard = servers.read().await;
435 servers_guard
436 .get(&server_name)
437 .map(|s| s.process.state == ServerState::Running)
438 .unwrap_or(false)
439 };
440
441 if !is_running {
442 break;
443 }
444
445 let start = std::time::Instant::now();
447 let result = {
448 let mut servers_guard = servers.write().await;
449 if let Some(server) = servers_guard.get_mut(&server_name) {
450 if let Some(ref mut child) = server.child {
451 match child.try_wait() {
452 Ok(None) => {
453 HealthCheckResult {
455 healthy: true,
456 latency: Some(start.elapsed()),
457 last_check: Utc::now(),
458 error: None,
459 }
460 }
461 Ok(Some(_)) => {
462 HealthCheckResult {
464 healthy: false,
465 latency: Some(start.elapsed()),
466 last_check: Utc::now(),
467 error: Some("Process has exited".to_string()),
468 }
469 }
470 Err(e) => HealthCheckResult {
471 healthy: false,
472 latency: Some(start.elapsed()),
473 last_check: Utc::now(),
474 error: Some(e.to_string()),
475 },
476 }
477 } else {
478 HealthCheckResult {
479 healthy: false,
480 latency: None,
481 last_check: Utc::now(),
482 error: Some("No child process".to_string()),
483 }
484 }
485 } else {
486 break;
487 }
488 };
489
490 if let Some(tx) = event_tx.lock().await.as_ref() {
492 let event = if result.healthy {
493 LifecycleEvent::HealthOk {
494 server_name: server_name.clone(),
495 result,
496 }
497 } else {
498 LifecycleEvent::HealthFailed {
499 server_name: server_name.clone(),
500 result,
501 }
502 };
503 let _ = tx.send(event).await;
504 }
505 }
506 })
507 }
508
509 pub(crate) fn topological_sort(&self, servers: &HashMap<String, ManagedServer>) -> Vec<String> {
511 let mut result = Vec::new();
512 let mut visited = std::collections::HashSet::new();
513 let mut temp_visited = std::collections::HashSet::new();
514
515 fn visit(
516 name: &str,
517 servers: &HashMap<String, ManagedServer>,
518 visited: &mut std::collections::HashSet<String>,
519 temp_visited: &mut std::collections::HashSet<String>,
520 result: &mut Vec<String>,
521 ) {
522 if visited.contains(name) {
523 return;
524 }
525 if temp_visited.contains(name) {
526 return;
528 }
529
530 temp_visited.insert(name.to_string());
531
532 if let Some(server) = servers.get(name) {
533 for dep in &server.dependencies {
534 visit(dep, servers, visited, temp_visited, result);
535 }
536 }
537
538 temp_visited.remove(name);
539 visited.insert(name.to_string());
540 result.push(name.to_string());
541 }
542
543 for name in servers.keys() {
544 visit(name, servers, &mut visited, &mut temp_visited, &mut result);
545 }
546
547 result
548 }
549}
550
551impl Default for McpLifecycleManager {
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557#[async_trait]
558impl LifecycleManager for McpLifecycleManager {
559 fn register_server(&self, name: &str, config: McpServerConfig) {
560 let servers = self.servers.clone();
561 let name = name.to_string();
562 tokio::spawn(async move {
563 let mut servers_guard = servers.write().await;
564 servers_guard.insert(name.clone(), ManagedServer::new(name, config));
565 });
566 }
567
568 async fn unregister_server(&self, name: &str) -> McpResult<()> {
569 if self.is_running(name) {
571 self.stop(
572 name,
573 Some(StopOptions {
574 force: true,
575 reason: Some("Unregistering server".to_string()),
576 }),
577 )
578 .await?;
579 }
580
581 let mut servers = self.servers.write().await;
582 servers.remove(name);
583 Ok(())
584 }
585
586 fn set_dependencies(&self, name: &str, dependencies: Vec<String>) {
587 let servers = self.servers.clone();
588 let name = name.to_string();
589 tokio::spawn(async move {
590 let mut servers_guard = servers.write().await;
591 if let Some(server) = servers_guard.get_mut(&name) {
592 server.dependencies = dependencies;
593 }
594 });
595 }
596
597 async fn start(&self, server_name: &str, options: Option<StartOptions>) -> McpResult<()> {
598 let options = options.unwrap_or_default();
599
600 let config = {
602 let servers = self.servers.read().await;
603 let server = servers.get(server_name).ok_or_else(|| {
604 McpError::lifecycle(
605 format!("Server not registered: {}", server_name),
606 Some(server_name.to_string()),
607 )
608 })?;
609
610 if !options.force && server.process.state == ServerState::Running {
612 return Ok(());
613 }
614
615 if server.config.transport_type != TransportType::Stdio {
617 return Err(McpError::lifecycle(
618 format!(
619 "Only stdio servers can be started as processes, got {:?}",
620 server.config.transport_type
621 ),
622 Some(server_name.to_string()),
623 ));
624 }
625
626 server.config.clone()
627 };
628
629 self.emit_event(LifecycleEvent::Starting {
631 server_name: server_name.to_string(),
632 })
633 .await;
634
635 {
637 let mut servers = self.servers.write().await;
638 if let Some(server) = servers.get_mut(server_name) {
639 server.process.state = ServerState::Starting;
640 }
641 }
642
643 let command = config.command.ok_or_else(|| {
645 McpError::lifecycle(
646 "Stdio server requires a command".to_string(),
647 Some(server_name.to_string()),
648 )
649 })?;
650
651 let args = config.args.unwrap_or_default();
652 let env = config.env.unwrap_or_default();
653
654 let mut cmd = Command::new(&command);
656 cmd.args(&args)
657 .envs(&env)
658 .stdin(Stdio::piped())
659 .stdout(Stdio::piped())
660 .stderr(Stdio::piped())
661 .kill_on_drop(true);
662
663 let startup_timeout = self.options.startup_timeout;
665 let spawn_result = tokio::time::timeout(startup_timeout, async { cmd.spawn() }).await;
666
667 let mut child = match spawn_result {
668 Ok(Ok(child)) => child,
669 Ok(Err(e)) => {
670 {
672 let mut servers = self.servers.write().await;
673 if let Some(server) = servers.get_mut(server_name) {
674 server.process.state = ServerState::Error;
675 server.process.last_error = Some(e.to_string());
676 server.process.consecutive_failures += 1;
677 }
678 }
679
680 self.emit_event(LifecycleEvent::Error {
681 server_name: server_name.to_string(),
682 error: e.to_string(),
683 })
684 .await;
685
686 return Err(McpError::lifecycle(
687 format!("Failed to spawn process: {}", e),
688 Some(server_name.to_string()),
689 ));
690 }
691 Err(_) => {
692 {
694 let mut servers = self.servers.write().await;
695 if let Some(server) = servers.get_mut(server_name) {
696 server.process.state = ServerState::Error;
697 server.process.last_error = Some("Startup timeout".to_string());
698 server.process.consecutive_failures += 1;
699 }
700 }
701
702 self.emit_event(LifecycleEvent::Error {
703 server_name: server_name.to_string(),
704 error: "Startup timeout".to_string(),
705 })
706 .await;
707
708 return Err(McpError::lifecycle(
709 format!("Startup timeout after {:?}", startup_timeout),
710 Some(server_name.to_string()),
711 ));
712 }
713 };
714
715 let pid = child.id();
717
718 let output_handles = self.start_output_capture(server_name.to_string(), &mut child);
720
721 {
723 let mut servers = self.servers.write().await;
724 if let Some(server) = servers.get_mut(server_name) {
725 server.process.state = ServerState::Running;
726 server.process.pid = pid;
727 server.process.started_at = Some(Utc::now());
728 server.process.stopped_at = None;
729 server.process.consecutive_failures = 0;
730 server.child = Some(child);
731 server.output_handles = output_handles;
732 }
733 }
734
735 self.start_exit_monitor(server_name.to_string());
737
738 if self.enable_health_checks {
740 let handle = self.start_health_check_monitor(server_name.to_string());
741 let mut servers = self.servers.write().await;
742 if let Some(server) = servers.get_mut(server_name) {
743 server.health_check_handle = Some(handle);
744 }
745 }
746
747 self.emit_event(LifecycleEvent::Started {
749 server_name: server_name.to_string(),
750 pid,
751 })
752 .await;
753
754 Ok(())
755 }
756
757 async fn start_all(&self) -> McpResult<()> {
758 let server_names: Vec<String> = {
759 let servers = self.servers.read().await;
760 self.topological_sort(&servers)
761 };
762
763 for name in server_names {
764 if let Err(e) = self.start(&name, None).await {
765 tracing::warn!("Failed to start server {}: {}", name, e);
766 }
767 }
768
769 Ok(())
770 }
771
772 async fn start_with_dependencies(&self, server_name: &str) -> McpResult<()> {
773 let dependencies = {
775 let servers = self.servers.read().await;
776 servers
777 .get(server_name)
778 .map(|s| s.dependencies.clone())
779 .unwrap_or_default()
780 };
781
782 for dep in dependencies {
784 self.start_with_dependencies(&dep).await?;
785 }
786
787 self.start(server_name, None).await
789 }
790
791 async fn stop(&self, server_name: &str, options: Option<StopOptions>) -> McpResult<()> {
792 let options = options.unwrap_or_default();
793
794 let child_exists = {
796 let servers = self.servers.read().await;
797 servers
798 .get(server_name)
799 .map(|s| s.child.is_some())
800 .unwrap_or(false)
801 };
802
803 if !child_exists {
804 return Ok(());
805 }
806
807 self.emit_event(LifecycleEvent::Stopping {
809 server_name: server_name.to_string(),
810 reason: options.reason.clone(),
811 })
812 .await;
813
814 {
816 let mut servers = self.servers.write().await;
817 if let Some(server) = servers.get_mut(server_name) {
818 server.process.state = ServerState::Stopping;
819 }
820 }
821
822 let mut child = {
824 let mut servers = self.servers.write().await;
825 servers.get_mut(server_name).and_then(|s| s.child.take())
826 };
827
828 if let Some(ref mut child) = child {
829 if options.force {
830 let _ = child.kill().await;
832 } else {
833 let shutdown_timeout = self.options.shutdown_timeout;
835
836 let wait_result = tokio::time::timeout(shutdown_timeout, child.wait()).await;
842
843 match wait_result {
844 Ok(Ok(_)) => {
845 }
847 Ok(Err(e)) => {
848 tracing::warn!("Error waiting for process: {}", e);
849 }
850 Err(_) => {
851 tracing::warn!(
853 "Graceful shutdown timeout for {}, force killing",
854 server_name
855 );
856 let _ = child.kill().await;
857 }
858 }
859 }
860 }
861
862 {
864 let mut servers = self.servers.write().await;
865 if let Some(server) = servers.get_mut(server_name) {
866 for handle in server.output_handles.drain(..) {
867 handle.abort();
868 }
869 if let Some(handle) = server.health_check_handle.take() {
870 handle.abort();
871 }
872 if let Some(handle) = server.restart_handle.take() {
873 handle.abort();
874 }
875 }
876 }
877
878 {
880 let mut servers = self.servers.write().await;
881 if let Some(server) = servers.get_mut(server_name) {
882 server.process.state = ServerState::Stopped;
883 server.process.pid = None;
884 server.process.stopped_at = Some(Utc::now());
885 server.child = None;
886 }
887 }
888
889 self.emit_event(LifecycleEvent::Stopped {
891 server_name: server_name.to_string(),
892 })
893 .await;
894
895 Ok(())
896 }
897
898 async fn stop_all(&self, force: bool) -> McpResult<()> {
899 let server_names: Vec<String> = {
900 let servers = self.servers.read().await;
901 let mut sorted = self.topological_sort(&servers);
903 sorted.reverse();
904 sorted
905 };
906
907 for name in server_names {
908 let options = StopOptions {
909 force,
910 reason: Some("Stopping all servers".to_string()),
911 };
912 if let Err(e) = self.stop(&name, Some(options)).await {
913 tracing::warn!("Failed to stop server {}: {}", name, e);
914 }
915 }
916
917 Ok(())
918 }
919
920 async fn restart(&self, server_name: &str) -> McpResult<()> {
921 self.stop(server_name, None).await?;
922 self.start(server_name, None).await
923 }
924
925 async fn restart_all(&self) -> McpResult<()> {
926 self.stop_all(false).await?;
927 self.start_all().await
928 }
929
930 async fn health_check(&self, server_name: &str) -> HealthCheckResult {
931 let start = std::time::Instant::now();
932
933 let mut servers = self.servers.write().await;
934 if let Some(server) = servers.get_mut(server_name) {
935 if let Some(ref mut child) = server.child {
936 match child.try_wait() {
937 Ok(None) => {
938 HealthCheckResult {
940 healthy: true,
941 latency: Some(start.elapsed()),
942 last_check: Utc::now(),
943 error: None,
944 }
945 }
946 Ok(Some(status)) => {
947 HealthCheckResult {
949 healthy: false,
950 latency: Some(start.elapsed()),
951 last_check: Utc::now(),
952 error: Some(format!("Process exited with status: {:?}", status)),
953 }
954 }
955 Err(e) => HealthCheckResult {
956 healthy: false,
957 latency: Some(start.elapsed()),
958 last_check: Utc::now(),
959 error: Some(e.to_string()),
960 },
961 }
962 } else {
963 HealthCheckResult {
964 healthy: false,
965 latency: None,
966 last_check: Utc::now(),
967 error: Some("Server not running".to_string()),
968 }
969 }
970 } else {
971 HealthCheckResult {
972 healthy: false,
973 latency: None,
974 last_check: Utc::now(),
975 error: Some("Server not found".to_string()),
976 }
977 }
978 }
979
980 async fn health_check_all(&self) -> HashMap<String, HealthCheckResult> {
981 let server_names: Vec<String> = {
982 let servers = self.servers.read().await;
983 servers.keys().cloned().collect()
984 };
985
986 let mut results = HashMap::new();
987 for name in server_names {
988 let result = self.health_check(&name).await;
989 results.insert(name, result);
990 }
991 results
992 }
993
994 fn get_state(&self, server_name: &str) -> ServerState {
995 self.servers
996 .try_read()
997 .ok()
998 .and_then(|servers| servers.get(server_name).map(|s| s.process.state))
999 .unwrap_or(ServerState::Stopped)
1000 }
1001
1002 fn get_process(&self, server_name: &str) -> Option<ServerProcess> {
1003 self.servers
1004 .try_read()
1005 .ok()
1006 .and_then(|servers| servers.get(server_name).map(|s| s.process.clone()))
1007 }
1008
1009 fn get_all_processes(&self) -> Vec<ServerProcess> {
1010 self.servers
1011 .try_read()
1012 .map(|servers| servers.values().map(|s| s.process.clone()).collect())
1013 .unwrap_or_default()
1014 }
1015
1016 fn is_running(&self, server_name: &str) -> bool {
1017 self.get_state(server_name) == ServerState::Running
1018 }
1019
1020 fn get_running_servers(&self) -> Vec<String> {
1021 self.servers
1022 .try_read()
1023 .map(|servers| {
1024 servers
1025 .iter()
1026 .filter(|(_, s)| s.process.state == ServerState::Running)
1027 .map(|(name, _)| name.clone())
1028 .collect()
1029 })
1030 .unwrap_or_default()
1031 }
1032
1033 fn subscribe(&self) -> mpsc::Receiver<LifecycleEvent> {
1034 let (tx, rx) = mpsc::channel(100);
1035 let event_tx = self.event_tx.clone();
1036 tokio::spawn(async move {
1037 *event_tx.lock().await = Some(tx);
1038 });
1039 rx
1040 }
1041
1042 async fn cleanup(&self) -> McpResult<()> {
1043 self.stop_all(true).await?;
1045
1046 let mut servers = self.servers.write().await;
1048 servers.clear();
1049
1050 Ok(())
1051 }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 use super::*;
1057 use std::time::Duration;
1058
1059 fn create_test_config() -> McpServerConfig {
1060 McpServerConfig {
1061 transport_type: TransportType::Stdio,
1062 command: Some("echo".to_string()),
1063 args: Some(vec!["hello".to_string()]),
1064 env: None,
1065 url: None,
1066 headers: None,
1067 enabled: true,
1068 timeout: Duration::from_secs(30),
1069 retries: 3,
1070 auto_approve: vec![],
1071 log_level: Default::default(),
1072 }
1073 }
1074
1075 #[test]
1076 fn test_lifecycle_manager_new() {
1077 let manager = McpLifecycleManager::new();
1078 assert!(manager.get_all_processes().is_empty());
1079 }
1080
1081 #[test]
1082 fn test_lifecycle_manager_with_options() {
1083 let options = LifecycleOptions {
1084 startup_timeout: Duration::from_secs(60),
1085 max_restarts: 5,
1086 ..Default::default()
1087 };
1088 let manager = McpLifecycleManager::with_options(options);
1089 assert_eq!(manager.options.startup_timeout, Duration::from_secs(60));
1090 assert_eq!(manager.options.max_restarts, 5);
1091 }
1092
1093 #[test]
1094 fn test_calculate_restart_delay() {
1095 let manager = McpLifecycleManager::new();
1096
1097 let delay0 = manager.calculate_restart_delay(0);
1098 let delay1 = manager.calculate_restart_delay(1);
1099 let delay2 = manager.calculate_restart_delay(2);
1100
1101 assert!(delay1 > delay0);
1103 assert!(delay2 > delay1);
1104
1105 let delay_max = manager.calculate_restart_delay(100);
1107 assert!(delay_max <= Duration::from_secs(60));
1108 }
1109
1110 #[test]
1111 fn test_server_state_default() {
1112 let process = ServerProcess::new("test".to_string());
1113 assert_eq!(process.state, ServerState::Stopped);
1114 assert_eq!(process.restart_count, 0);
1115 assert!(process.pid.is_none());
1116 }
1117
1118 #[test]
1119 fn test_start_options_default() {
1120 let options = StartOptions::default();
1121 assert!(!options.force);
1122 assert!(!options.wait_for_ready);
1123 assert!(options.dependencies.is_empty());
1124 }
1125
1126 #[test]
1127 fn test_stop_options_default() {
1128 let options = StopOptions::default();
1129 assert!(!options.force);
1130 assert!(options.reason.is_none());
1131 }
1132
1133 #[test]
1134 fn test_lifecycle_options_default() {
1135 let options = LifecycleOptions::default();
1136 assert_eq!(options.startup_timeout, Duration::from_secs(30));
1137 assert_eq!(options.shutdown_timeout, Duration::from_secs(10));
1138 assert_eq!(options.max_restarts, 3);
1139 }
1140
1141 #[tokio::test]
1142 async fn test_register_and_get_process() {
1143 let manager = McpLifecycleManager::new();
1144 let config = create_test_config();
1145
1146 manager.register_server("test-server", config);
1147
1148 tokio::time::sleep(Duration::from_millis(50)).await;
1150
1151 let process = manager.get_process("test-server");
1152 assert!(process.is_some());
1153 assert_eq!(process.unwrap().name, "test-server");
1154 }
1155
1156 #[tokio::test]
1157 async fn test_get_state_unregistered() {
1158 let manager = McpLifecycleManager::new();
1159 let state = manager.get_state("nonexistent");
1160 assert_eq!(state, ServerState::Stopped);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_is_running_not_started() {
1165 let manager = McpLifecycleManager::new();
1166 let config = create_test_config();
1167
1168 manager.register_server("test-server", config);
1169 tokio::time::sleep(Duration::from_millis(50)).await;
1170
1171 assert!(!manager.is_running("test-server"));
1172 }
1173
1174 #[tokio::test]
1175 async fn test_get_running_servers_empty() {
1176 let manager = McpLifecycleManager::new();
1177 let running = manager.get_running_servers();
1178 assert!(running.is_empty());
1179 }
1180
1181 #[tokio::test]
1182 async fn test_set_dependencies() {
1183 let manager = McpLifecycleManager::new();
1184 let config = create_test_config();
1185
1186 manager.register_server("server-a", config.clone());
1187 manager.register_server("server-b", config);
1188 tokio::time::sleep(Duration::from_millis(50)).await;
1189
1190 manager.set_dependencies("server-b", vec!["server-a".to_string()]);
1191 tokio::time::sleep(Duration::from_millis(50)).await;
1192
1193 let servers = manager.servers.read().await;
1195 let server_b = servers.get("server-b").unwrap();
1196 assert_eq!(server_b.dependencies, vec!["server-a".to_string()]);
1197 }
1198
1199 #[tokio::test]
1200 async fn test_topological_sort() {
1201 let manager = McpLifecycleManager::new();
1202 let config = create_test_config();
1203
1204 manager.register_server("server-a", config.clone());
1206 manager.register_server("server-b", config.clone());
1207 manager.register_server("server-c", config);
1208 tokio::time::sleep(Duration::from_millis(50)).await;
1209
1210 manager.set_dependencies("server-c", vec!["server-b".to_string()]);
1212 manager.set_dependencies("server-b", vec!["server-a".to_string()]);
1213 tokio::time::sleep(Duration::from_millis(50)).await;
1214
1215 let servers = manager.servers.read().await;
1216 let sorted = manager.topological_sort(&servers);
1217
1218 let pos_a = sorted.iter().position(|x| x == "server-a").unwrap();
1220 let pos_b = sorted.iter().position(|x| x == "server-b").unwrap();
1221 let pos_c = sorted.iter().position(|x| x == "server-c").unwrap();
1222
1223 assert!(pos_a < pos_b);
1224 assert!(pos_b < pos_c);
1225 }
1226
1227 #[tokio::test]
1228 async fn test_unregister_server() {
1229 let manager = McpLifecycleManager::new();
1230 let config = create_test_config();
1231
1232 manager.register_server("test-server", config);
1233 tokio::time::sleep(Duration::from_millis(50)).await;
1234
1235 assert!(manager.get_process("test-server").is_some());
1236
1237 manager.unregister_server("test-server").await.unwrap();
1238
1239 assert!(manager.get_process("test-server").is_none());
1240 }
1241
1242 #[tokio::test]
1243 async fn test_cleanup() {
1244 let manager = McpLifecycleManager::new();
1245 let config = create_test_config();
1246
1247 manager.register_server("server-1", config.clone());
1248 manager.register_server("server-2", config);
1249 tokio::time::sleep(Duration::from_millis(50)).await;
1250
1251 assert_eq!(manager.get_all_processes().len(), 2);
1252
1253 manager.cleanup().await.unwrap();
1254
1255 assert!(manager.get_all_processes().is_empty());
1256 }
1257
1258 #[tokio::test]
1259 async fn test_health_check_not_running() {
1260 let manager = McpLifecycleManager::new();
1261 let config = create_test_config();
1262
1263 manager.register_server("test-server", config);
1264 tokio::time::sleep(Duration::from_millis(50)).await;
1265
1266 let result = manager.health_check("test-server").await;
1267 assert!(!result.healthy);
1268 assert!(result.error.is_some());
1269 }
1270
1271 #[tokio::test]
1272 async fn test_health_check_nonexistent() {
1273 let manager = McpLifecycleManager::new();
1274
1275 let result = manager.health_check("nonexistent").await;
1276 assert!(!result.healthy);
1277 assert!(result.error.unwrap().contains("not found"));
1278 }
1279
1280 #[tokio::test]
1281 async fn test_subscribe_events() {
1282 let manager = McpLifecycleManager::new();
1283 let _rx = manager.subscribe();
1284
1285 tokio::time::sleep(Duration::from_millis(50)).await;
1287 }
1288}