Skip to main content

aster/mcp/
lifecycle_manager.rs

1//! MCP Lifecycle Manager
2//!
3//! This module implements the lifecycle manager for MCP servers.
4//! It manages server processes including starting, stopping, restarting,
5//! health monitoring, and dependency management.
6//!
7//! # Features
8//!
9//! - Start/stop server processes with configurable timeouts
10//! - Automatic restart with exponential backoff on unexpected exits
11//! - Health check monitoring
12//! - Dependency-based startup ordering
13//! - stdout/stderr capture and event emission
14//!
15//! # Requirements Coverage
16//!
17//! - 3.1: Server startup with command and arguments
18//! - 3.2: Automatic restart with backoff on unexpected exit
19//! - 3.3: Server state tracking (stopped, starting, running, stopping, error, crashed)
20//! - 3.4: Maximum restart attempts before marking as crashed
21//! - 3.5: Graceful shutdown with configurable timeout
22//! - 3.6: Force kill after graceful shutdown timeout
23//! - 3.7: stdout/stderr capture and event emission
24//! - 3.8: Dependency-based startup ordering
25
26use async_trait::async_trait;
27use chrono::Utc;
28use std::collections::HashMap;
29use std::process::Stdio;
30use std::sync::Arc;
31use std::time::Duration;
32use tokio::io::{AsyncBufReadExt, BufReader};
33use tokio::process::{Child, Command};
34use tokio::sync::{mpsc, Mutex, RwLock};
35
36use crate::mcp::error::{McpError, McpResult};
37use crate::mcp::types::{
38    HealthCheckResult, LifecycleOptions, McpServerConfig, ServerProcess, ServerState, TransportType,
39};
40
41/// Lifecycle event for monitoring server state changes
42#[derive(Debug, Clone)]
43pub enum LifecycleEvent {
44    /// Server is starting
45    Starting { server_name: String },
46    /// Server started successfully
47    Started {
48        server_name: String,
49        pid: Option<u32>,
50    },
51    /// Server is stopping
52    Stopping {
53        server_name: String,
54        reason: Option<String>,
55    },
56    /// Server stopped
57    Stopped { server_name: String },
58    /// Server error occurred
59    Error { server_name: String, error: String },
60    /// Server crashed
61    Crashed {
62        server_name: String,
63        exit_code: Option<i32>,
64    },
65    /// Server is restarting
66    Restarting { server_name: String },
67    /// Health check passed
68    HealthOk {
69        server_name: String,
70        result: HealthCheckResult,
71    },
72    /// Health check failed
73    HealthFailed {
74        server_name: String,
75        result: HealthCheckResult,
76    },
77    /// stdout output from server
78    Stdout { server_name: String, data: String },
79    /// stderr output from server
80    Stderr { server_name: String, data: String },
81}
82
83/// Start options for server startup
84#[derive(Debug, Clone, Default)]
85pub struct StartOptions {
86    /// Force start even if already running
87    pub force: bool,
88    /// Wait for server to be ready
89    pub wait_for_ready: bool,
90    /// Dependencies to start first
91    pub dependencies: Vec<String>,
92}
93
94/// Stop options for server shutdown
95#[derive(Debug, Clone, Default)]
96pub struct StopOptions {
97    /// Force stop (skip graceful shutdown)
98    pub force: bool,
99    /// Reason for stopping
100    pub reason: Option<String>,
101}
102
103/// Internal server state with process handle
104pub(crate) struct ManagedServer {
105    /// Server process info
106    process: ServerProcess,
107    /// Server configuration
108    config: McpServerConfig,
109    /// Child process handle (if running)
110    child: Option<Child>,
111    /// Dependencies (server names)
112    dependencies: Vec<String>,
113    /// Output capture task handles
114    output_handles: Vec<tokio::task::JoinHandle<()>>,
115    /// Health check task handle
116    health_check_handle: Option<tokio::task::JoinHandle<()>>,
117    /// Auto-restart task handle
118    restart_handle: Option<tokio::task::JoinHandle<()>>,
119}
120
121impl ManagedServer {
122    fn new(name: String, config: McpServerConfig) -> Self {
123        Self {
124            process: ServerProcess::new(name),
125            config,
126            child: None,
127            dependencies: Vec::new(),
128            output_handles: Vec::new(),
129            health_check_handle: None,
130            restart_handle: None,
131        }
132    }
133}
134
135/// Lifecycle manager trait
136///
137/// Defines the interface for managing MCP server lifecycles.
138#[async_trait]
139pub trait LifecycleManager: Send + Sync {
140    /// Register a server configuration
141    fn register_server(&self, name: &str, config: McpServerConfig);
142
143    /// Unregister a server
144    async fn unregister_server(&self, name: &str) -> McpResult<()>;
145
146    /// Set server dependencies
147    fn set_dependencies(&self, name: &str, dependencies: Vec<String>);
148
149    /// Start a server
150    async fn start(&self, server_name: &str, options: Option<StartOptions>) -> McpResult<()>;
151
152    /// Start all registered servers
153    async fn start_all(&self) -> McpResult<()>;
154
155    /// Start a server with its dependencies
156    async fn start_with_dependencies(&self, server_name: &str) -> McpResult<()>;
157
158    /// Stop a server
159    async fn stop(&self, server_name: &str, options: Option<StopOptions>) -> McpResult<()>;
160
161    /// Stop all servers
162    async fn stop_all(&self, force: bool) -> McpResult<()>;
163
164    /// Restart a server
165    async fn restart(&self, server_name: &str) -> McpResult<()>;
166
167    /// Restart all servers
168    async fn restart_all(&self) -> McpResult<()>;
169
170    /// Perform health check on a server
171    async fn health_check(&self, server_name: &str) -> HealthCheckResult;
172
173    /// Perform health check on all servers
174    async fn health_check_all(&self) -> HashMap<String, HealthCheckResult>;
175
176    /// Get server state
177    fn get_state(&self, server_name: &str) -> ServerState;
178
179    /// Get server process info
180    fn get_process(&self, server_name: &str) -> Option<ServerProcess>;
181
182    /// Get all server processes
183    fn get_all_processes(&self) -> Vec<ServerProcess>;
184
185    /// Check if a server is running
186    fn is_running(&self, server_name: &str) -> bool;
187
188    /// Get list of running servers
189    fn get_running_servers(&self) -> Vec<String>;
190
191    /// Subscribe to lifecycle events
192    fn subscribe(&self) -> mpsc::Receiver<LifecycleEvent>;
193
194    /// Cleanup all resources
195    async fn cleanup(&self) -> McpResult<()>;
196}
197
198/// Default implementation of the lifecycle manager
199pub struct McpLifecycleManager {
200    /// Managed servers
201    pub(crate) servers: Arc<RwLock<HashMap<String, ManagedServer>>>,
202    /// Default lifecycle options
203    pub options: LifecycleOptions,
204    /// Event channel sender
205    event_tx: Arc<Mutex<Option<mpsc::Sender<LifecycleEvent>>>>,
206    /// Enable auto-restart
207    enable_auto_restart: bool,
208    /// Enable health checks
209    enable_health_checks: bool,
210}
211
212impl McpLifecycleManager {
213    /// Create a new lifecycle manager with default options
214    pub fn new() -> Self {
215        Self::with_options(LifecycleOptions::default())
216    }
217
218    /// Create a new lifecycle manager with custom options
219    pub fn with_options(options: LifecycleOptions) -> Self {
220        Self {
221            servers: Arc::new(RwLock::new(HashMap::new())),
222            options,
223            event_tx: Arc::new(Mutex::new(None)),
224            enable_auto_restart: true,
225            enable_health_checks: true,
226        }
227    }
228
229    /// Enable or disable auto-restart
230    pub fn set_auto_restart_enabled(&mut self, enabled: bool) {
231        self.enable_auto_restart = enabled;
232    }
233
234    /// Enable or disable health checks
235    pub fn set_health_checks_enabled(&mut self, enabled: bool) {
236        self.enable_health_checks = enabled;
237    }
238
239    /// Emit a lifecycle event
240    async fn emit_event(&self, event: LifecycleEvent) {
241        if let Some(tx) = self.event_tx.lock().await.as_ref() {
242            let _ = tx.send(event).await;
243        }
244    }
245
246    /// Calculate restart delay with exponential backoff
247    pub fn calculate_restart_delay(&self, attempt: u32) -> Duration {
248        let base = self.options.restart_delay.as_millis() as u64;
249        // Exponential backoff: base * 2^attempt, capped at 60 seconds
250        let max_delay_ms = 60_000u64;
251        let delay_ms = base.saturating_mul(1u64 << attempt.min(10));
252        Duration::from_millis(delay_ms.min(max_delay_ms))
253    }
254
255    /// Start output capture for a child process
256    fn start_output_capture(
257        &self,
258        server_name: String,
259        child: &mut Child,
260    ) -> Vec<tokio::task::JoinHandle<()>> {
261        let mut handles = Vec::new();
262        let event_tx = self.event_tx.clone();
263
264        // Capture stdout
265        if let Some(stdout) = child.stdout.take() {
266            let name = server_name.clone();
267            let tx = event_tx.clone();
268            let handle = tokio::spawn(async move {
269                let reader = BufReader::new(stdout);
270                let mut lines = reader.lines();
271                while let Ok(Some(line)) = lines.next_line().await {
272                    if let Some(sender) = tx.lock().await.as_ref() {
273                        let _ = sender
274                            .send(LifecycleEvent::Stdout {
275                                server_name: name.clone(),
276                                data: line,
277                            })
278                            .await;
279                    }
280                }
281            });
282            handles.push(handle);
283        }
284
285        // Capture stderr
286        if let Some(stderr) = child.stderr.take() {
287            let name = server_name;
288            let tx = event_tx;
289            let handle = tokio::spawn(async move {
290                let reader = BufReader::new(stderr);
291                let mut lines = reader.lines();
292                while let Ok(Some(line)) = lines.next_line().await {
293                    if let Some(sender) = tx.lock().await.as_ref() {
294                        let _ = sender
295                            .send(LifecycleEvent::Stderr {
296                                server_name: name.clone(),
297                                data: line,
298                            })
299                            .await;
300                    }
301                }
302            });
303            handles.push(handle);
304        }
305
306        handles
307    }
308
309    /// Start monitoring a process for unexpected exit
310    fn start_exit_monitor(&self, server_name: String) {
311        if !self.enable_auto_restart {
312            return;
313        }
314
315        let servers = self.servers.clone();
316        let event_tx = self.event_tx.clone();
317        let options = self.options.clone();
318        let enable_auto_restart = self.enable_auto_restart;
319
320        tokio::spawn(async move {
321            loop {
322                // Check if process is still running
323                let should_restart = {
324                    let mut servers_guard = servers.write().await;
325                    if let Some(server) = servers_guard.get_mut(&server_name) {
326                        if let Some(ref mut child) = server.child {
327                            match child.try_wait() {
328                                Ok(Some(status)) => {
329                                    // Process exited
330                                    let exit_code = status.code();
331                                    server.process.state = ServerState::Crashed;
332                                    server.process.stopped_at = Some(Utc::now());
333                                    server.process.consecutive_failures += 1;
334                                    server.child = None;
335
336                                    // Emit crashed event
337                                    if let Some(tx) = event_tx.lock().await.as_ref() {
338                                        let _ = tx
339                                            .send(LifecycleEvent::Crashed {
340                                                server_name: server_name.clone(),
341                                                exit_code,
342                                            })
343                                            .await;
344                                    }
345
346                                    // Check if we should restart
347                                    if enable_auto_restart
348                                        && server.process.restart_count < options.max_restarts
349                                    {
350                                        true
351                                    } else {
352                                        server.process.state = ServerState::Crashed;
353                                        server.process.last_error = Some(format!(
354                                            "Process exited with code {:?}, max restarts exceeded",
355                                            exit_code
356                                        ));
357                                        false
358                                    }
359                                }
360                                Ok(None) => {
361                                    // Process still running
362                                    false
363                                }
364                                Err(e) => {
365                                    // Error checking process
366                                    server.process.last_error = Some(e.to_string());
367                                    false
368                                }
369                            }
370                        } else {
371                            // No child process, stop monitoring
372                            break;
373                        }
374                    } else {
375                        // Server not found, stop monitoring
376                        break;
377                    }
378                };
379
380                if should_restart {
381                    // Calculate delay based on restart count
382                    let restart_count = {
383                        let servers_guard = servers.read().await;
384                        servers_guard
385                            .get(&server_name)
386                            .map(|s| s.process.restart_count)
387                            .unwrap_or(0)
388                    };
389
390                    let base = options.restart_delay.as_millis() as u64;
391                    let delay_ms = base.saturating_mul(1u64 << restart_count.min(10));
392                    let delay = Duration::from_millis(delay_ms.min(60_000));
393
394                    // Emit restarting event
395                    if let Some(tx) = event_tx.lock().await.as_ref() {
396                        let _ = tx
397                            .send(LifecycleEvent::Restarting {
398                                server_name: server_name.clone(),
399                            })
400                            .await;
401                    }
402
403                    tokio::time::sleep(delay).await;
404
405                    // Attempt restart
406                    let mut servers_guard = servers.write().await;
407                    if let Some(server) = servers_guard.get_mut(&server_name) {
408                        server.process.restart_count += 1;
409                        // The actual restart will be handled by the start method
410                        // For now, just update state
411                        server.process.state = ServerState::Starting;
412                    }
413                }
414
415                // Sleep before next check
416                tokio::time::sleep(Duration::from_millis(500)).await;
417            }
418        });
419    }
420
421    /// Start health check monitoring for a server
422    fn start_health_check_monitor(&self, server_name: String) -> tokio::task::JoinHandle<()> {
423        let servers = self.servers.clone();
424        let event_tx = self.event_tx.clone();
425        let interval = self.options.health_check_interval;
426
427        tokio::spawn(async move {
428            let mut interval_timer = tokio::time::interval(interval);
429
430            loop {
431                interval_timer.tick().await;
432
433                let is_running = {
434                    let servers_guard = servers.read().await;
435                    servers_guard
436                        .get(&server_name)
437                        .map(|s| s.process.state == ServerState::Running)
438                        .unwrap_or(false)
439                };
440
441                if !is_running {
442                    break;
443                }
444
445                // Perform health check (check if process is still alive)
446                let start = std::time::Instant::now();
447                let result = {
448                    let mut servers_guard = servers.write().await;
449                    if let Some(server) = servers_guard.get_mut(&server_name) {
450                        if let Some(ref mut child) = server.child {
451                            match child.try_wait() {
452                                Ok(None) => {
453                                    // Process is still running
454                                    HealthCheckResult {
455                                        healthy: true,
456                                        latency: Some(start.elapsed()),
457                                        last_check: Utc::now(),
458                                        error: None,
459                                    }
460                                }
461                                Ok(Some(_)) => {
462                                    // Process has exited
463                                    HealthCheckResult {
464                                        healthy: false,
465                                        latency: Some(start.elapsed()),
466                                        last_check: Utc::now(),
467                                        error: Some("Process has exited".to_string()),
468                                    }
469                                }
470                                Err(e) => HealthCheckResult {
471                                    healthy: false,
472                                    latency: Some(start.elapsed()),
473                                    last_check: Utc::now(),
474                                    error: Some(e.to_string()),
475                                },
476                            }
477                        } else {
478                            HealthCheckResult {
479                                healthy: false,
480                                latency: None,
481                                last_check: Utc::now(),
482                                error: Some("No child process".to_string()),
483                            }
484                        }
485                    } else {
486                        break;
487                    }
488                };
489
490                // Emit health event
491                if let Some(tx) = event_tx.lock().await.as_ref() {
492                    let event = if result.healthy {
493                        LifecycleEvent::HealthOk {
494                            server_name: server_name.clone(),
495                            result,
496                        }
497                    } else {
498                        LifecycleEvent::HealthFailed {
499                            server_name: server_name.clone(),
500                            result,
501                        }
502                    };
503                    let _ = tx.send(event).await;
504                }
505            }
506        })
507    }
508
509    /// Get topologically sorted server names based on dependencies
510    pub(crate) fn topological_sort(&self, servers: &HashMap<String, ManagedServer>) -> Vec<String> {
511        let mut result = Vec::new();
512        let mut visited = std::collections::HashSet::new();
513        let mut temp_visited = std::collections::HashSet::new();
514
515        fn visit(
516            name: &str,
517            servers: &HashMap<String, ManagedServer>,
518            visited: &mut std::collections::HashSet<String>,
519            temp_visited: &mut std::collections::HashSet<String>,
520            result: &mut Vec<String>,
521        ) {
522            if visited.contains(name) {
523                return;
524            }
525            if temp_visited.contains(name) {
526                // Cycle detected, skip
527                return;
528            }
529
530            temp_visited.insert(name.to_string());
531
532            if let Some(server) = servers.get(name) {
533                for dep in &server.dependencies {
534                    visit(dep, servers, visited, temp_visited, result);
535                }
536            }
537
538            temp_visited.remove(name);
539            visited.insert(name.to_string());
540            result.push(name.to_string());
541        }
542
543        for name in servers.keys() {
544            visit(name, servers, &mut visited, &mut temp_visited, &mut result);
545        }
546
547        result
548    }
549}
550
551impl Default for McpLifecycleManager {
552    fn default() -> Self {
553        Self::new()
554    }
555}
556
557#[async_trait]
558impl LifecycleManager for McpLifecycleManager {
559    fn register_server(&self, name: &str, config: McpServerConfig) {
560        let servers = self.servers.clone();
561        let name = name.to_string();
562        tokio::spawn(async move {
563            let mut servers_guard = servers.write().await;
564            servers_guard.insert(name.clone(), ManagedServer::new(name, config));
565        });
566    }
567
568    async fn unregister_server(&self, name: &str) -> McpResult<()> {
569        // Stop the server first if running
570        if self.is_running(name) {
571            self.stop(
572                name,
573                Some(StopOptions {
574                    force: true,
575                    reason: Some("Unregistering server".to_string()),
576                }),
577            )
578            .await?;
579        }
580
581        let mut servers = self.servers.write().await;
582        servers.remove(name);
583        Ok(())
584    }
585
586    fn set_dependencies(&self, name: &str, dependencies: Vec<String>) {
587        let servers = self.servers.clone();
588        let name = name.to_string();
589        tokio::spawn(async move {
590            let mut servers_guard = servers.write().await;
591            if let Some(server) = servers_guard.get_mut(&name) {
592                server.dependencies = dependencies;
593            }
594        });
595    }
596
597    async fn start(&self, server_name: &str, options: Option<StartOptions>) -> McpResult<()> {
598        let options = options.unwrap_or_default();
599
600        // Check if server is registered
601        let config = {
602            let servers = self.servers.read().await;
603            let server = servers.get(server_name).ok_or_else(|| {
604                McpError::lifecycle(
605                    format!("Server not registered: {}", server_name),
606                    Some(server_name.to_string()),
607                )
608            })?;
609
610            // Check if already running
611            if !options.force && server.process.state == ServerState::Running {
612                return Ok(());
613            }
614
615            // Only stdio servers can be started as processes
616            if server.config.transport_type != TransportType::Stdio {
617                return Err(McpError::lifecycle(
618                    format!(
619                        "Only stdio servers can be started as processes, got {:?}",
620                        server.config.transport_type
621                    ),
622                    Some(server_name.to_string()),
623                ));
624            }
625
626            server.config.clone()
627        };
628
629        // Emit starting event
630        self.emit_event(LifecycleEvent::Starting {
631            server_name: server_name.to_string(),
632        })
633        .await;
634
635        // Update state to starting
636        {
637            let mut servers = self.servers.write().await;
638            if let Some(server) = servers.get_mut(server_name) {
639                server.process.state = ServerState::Starting;
640            }
641        }
642
643        // Get command and args
644        let command = config.command.ok_or_else(|| {
645            McpError::lifecycle(
646                "Stdio server requires a command".to_string(),
647                Some(server_name.to_string()),
648            )
649        })?;
650
651        let args = config.args.unwrap_or_default();
652        let env = config.env.unwrap_or_default();
653
654        // Build command
655        let mut cmd = Command::new(&command);
656        cmd.args(&args)
657            .envs(&env)
658            .stdin(Stdio::piped())
659            .stdout(Stdio::piped())
660            .stderr(Stdio::piped())
661            .kill_on_drop(true);
662
663        // Spawn process with timeout
664        let startup_timeout = self.options.startup_timeout;
665        let spawn_result = tokio::time::timeout(startup_timeout, async { cmd.spawn() }).await;
666
667        let mut child = match spawn_result {
668            Ok(Ok(child)) => child,
669            Ok(Err(e)) => {
670                // Update state to error
671                {
672                    let mut servers = self.servers.write().await;
673                    if let Some(server) = servers.get_mut(server_name) {
674                        server.process.state = ServerState::Error;
675                        server.process.last_error = Some(e.to_string());
676                        server.process.consecutive_failures += 1;
677                    }
678                }
679
680                self.emit_event(LifecycleEvent::Error {
681                    server_name: server_name.to_string(),
682                    error: e.to_string(),
683                })
684                .await;
685
686                return Err(McpError::lifecycle(
687                    format!("Failed to spawn process: {}", e),
688                    Some(server_name.to_string()),
689                ));
690            }
691            Err(_) => {
692                // Timeout
693                {
694                    let mut servers = self.servers.write().await;
695                    if let Some(server) = servers.get_mut(server_name) {
696                        server.process.state = ServerState::Error;
697                        server.process.last_error = Some("Startup timeout".to_string());
698                        server.process.consecutive_failures += 1;
699                    }
700                }
701
702                self.emit_event(LifecycleEvent::Error {
703                    server_name: server_name.to_string(),
704                    error: "Startup timeout".to_string(),
705                })
706                .await;
707
708                return Err(McpError::lifecycle(
709                    format!("Startup timeout after {:?}", startup_timeout),
710                    Some(server_name.to_string()),
711                ));
712            }
713        };
714
715        // Get PID
716        let pid = child.id();
717
718        // Start output capture
719        let output_handles = self.start_output_capture(server_name.to_string(), &mut child);
720
721        // Update server state
722        {
723            let mut servers = self.servers.write().await;
724            if let Some(server) = servers.get_mut(server_name) {
725                server.process.state = ServerState::Running;
726                server.process.pid = pid;
727                server.process.started_at = Some(Utc::now());
728                server.process.stopped_at = None;
729                server.process.consecutive_failures = 0;
730                server.child = Some(child);
731                server.output_handles = output_handles;
732            }
733        }
734
735        // Start exit monitor for auto-restart
736        self.start_exit_monitor(server_name.to_string());
737
738        // Start health check monitor if enabled
739        if self.enable_health_checks {
740            let handle = self.start_health_check_monitor(server_name.to_string());
741            let mut servers = self.servers.write().await;
742            if let Some(server) = servers.get_mut(server_name) {
743                server.health_check_handle = Some(handle);
744            }
745        }
746
747        // Emit started event
748        self.emit_event(LifecycleEvent::Started {
749            server_name: server_name.to_string(),
750            pid,
751        })
752        .await;
753
754        Ok(())
755    }
756
757    async fn start_all(&self) -> McpResult<()> {
758        let server_names: Vec<String> = {
759            let servers = self.servers.read().await;
760            self.topological_sort(&servers)
761        };
762
763        for name in server_names {
764            if let Err(e) = self.start(&name, None).await {
765                tracing::warn!("Failed to start server {}: {}", name, e);
766            }
767        }
768
769        Ok(())
770    }
771
772    async fn start_with_dependencies(&self, server_name: &str) -> McpResult<()> {
773        // Get dependencies
774        let dependencies = {
775            let servers = self.servers.read().await;
776            servers
777                .get(server_name)
778                .map(|s| s.dependencies.clone())
779                .unwrap_or_default()
780        };
781
782        // Start dependencies first (recursively)
783        for dep in dependencies {
784            self.start_with_dependencies(&dep).await?;
785        }
786
787        // Start this server
788        self.start(server_name, None).await
789    }
790
791    async fn stop(&self, server_name: &str, options: Option<StopOptions>) -> McpResult<()> {
792        let options = options.unwrap_or_default();
793
794        // Check if server exists and is running
795        let child_exists = {
796            let servers = self.servers.read().await;
797            servers
798                .get(server_name)
799                .map(|s| s.child.is_some())
800                .unwrap_or(false)
801        };
802
803        if !child_exists {
804            return Ok(());
805        }
806
807        // Emit stopping event
808        self.emit_event(LifecycleEvent::Stopping {
809            server_name: server_name.to_string(),
810            reason: options.reason.clone(),
811        })
812        .await;
813
814        // Update state to stopping
815        {
816            let mut servers = self.servers.write().await;
817            if let Some(server) = servers.get_mut(server_name) {
818                server.process.state = ServerState::Stopping;
819            }
820        }
821
822        // Get child process
823        let mut child = {
824            let mut servers = self.servers.write().await;
825            servers.get_mut(server_name).and_then(|s| s.child.take())
826        };
827
828        if let Some(ref mut child) = child {
829            if options.force {
830                // Force kill immediately
831                let _ = child.kill().await;
832            } else {
833                // Try graceful shutdown first
834                let shutdown_timeout = self.options.shutdown_timeout;
835
836                // On Unix, we try to send SIGTERM first via the child's kill method
837                // which sends SIGKILL. For graceful shutdown, we just wait with timeout.
838                // The process should handle its own graceful shutdown.
839
840                // Wait for graceful shutdown with timeout
841                let wait_result = tokio::time::timeout(shutdown_timeout, child.wait()).await;
842
843                match wait_result {
844                    Ok(Ok(_)) => {
845                        // Process exited gracefully
846                    }
847                    Ok(Err(e)) => {
848                        tracing::warn!("Error waiting for process: {}", e);
849                    }
850                    Err(_) => {
851                        // Timeout - force kill
852                        tracing::warn!(
853                            "Graceful shutdown timeout for {}, force killing",
854                            server_name
855                        );
856                        let _ = child.kill().await;
857                    }
858                }
859            }
860        }
861
862        // Cancel output capture handles
863        {
864            let mut servers = self.servers.write().await;
865            if let Some(server) = servers.get_mut(server_name) {
866                for handle in server.output_handles.drain(..) {
867                    handle.abort();
868                }
869                if let Some(handle) = server.health_check_handle.take() {
870                    handle.abort();
871                }
872                if let Some(handle) = server.restart_handle.take() {
873                    handle.abort();
874                }
875            }
876        }
877
878        // Update state
879        {
880            let mut servers = self.servers.write().await;
881            if let Some(server) = servers.get_mut(server_name) {
882                server.process.state = ServerState::Stopped;
883                server.process.pid = None;
884                server.process.stopped_at = Some(Utc::now());
885                server.child = None;
886            }
887        }
888
889        // Emit stopped event
890        self.emit_event(LifecycleEvent::Stopped {
891            server_name: server_name.to_string(),
892        })
893        .await;
894
895        Ok(())
896    }
897
898    async fn stop_all(&self, force: bool) -> McpResult<()> {
899        let server_names: Vec<String> = {
900            let servers = self.servers.read().await;
901            // Stop in reverse dependency order
902            let mut sorted = self.topological_sort(&servers);
903            sorted.reverse();
904            sorted
905        };
906
907        for name in server_names {
908            let options = StopOptions {
909                force,
910                reason: Some("Stopping all servers".to_string()),
911            };
912            if let Err(e) = self.stop(&name, Some(options)).await {
913                tracing::warn!("Failed to stop server {}: {}", name, e);
914            }
915        }
916
917        Ok(())
918    }
919
920    async fn restart(&self, server_name: &str) -> McpResult<()> {
921        self.stop(server_name, None).await?;
922        self.start(server_name, None).await
923    }
924
925    async fn restart_all(&self) -> McpResult<()> {
926        self.stop_all(false).await?;
927        self.start_all().await
928    }
929
930    async fn health_check(&self, server_name: &str) -> HealthCheckResult {
931        let start = std::time::Instant::now();
932
933        let mut servers = self.servers.write().await;
934        if let Some(server) = servers.get_mut(server_name) {
935            if let Some(ref mut child) = server.child {
936                match child.try_wait() {
937                    Ok(None) => {
938                        // Process is still running
939                        HealthCheckResult {
940                            healthy: true,
941                            latency: Some(start.elapsed()),
942                            last_check: Utc::now(),
943                            error: None,
944                        }
945                    }
946                    Ok(Some(status)) => {
947                        // Process has exited
948                        HealthCheckResult {
949                            healthy: false,
950                            latency: Some(start.elapsed()),
951                            last_check: Utc::now(),
952                            error: Some(format!("Process exited with status: {:?}", status)),
953                        }
954                    }
955                    Err(e) => HealthCheckResult {
956                        healthy: false,
957                        latency: Some(start.elapsed()),
958                        last_check: Utc::now(),
959                        error: Some(e.to_string()),
960                    },
961                }
962            } else {
963                HealthCheckResult {
964                    healthy: false,
965                    latency: None,
966                    last_check: Utc::now(),
967                    error: Some("Server not running".to_string()),
968                }
969            }
970        } else {
971            HealthCheckResult {
972                healthy: false,
973                latency: None,
974                last_check: Utc::now(),
975                error: Some("Server not found".to_string()),
976            }
977        }
978    }
979
980    async fn health_check_all(&self) -> HashMap<String, HealthCheckResult> {
981        let server_names: Vec<String> = {
982            let servers = self.servers.read().await;
983            servers.keys().cloned().collect()
984        };
985
986        let mut results = HashMap::new();
987        for name in server_names {
988            let result = self.health_check(&name).await;
989            results.insert(name, result);
990        }
991        results
992    }
993
994    fn get_state(&self, server_name: &str) -> ServerState {
995        self.servers
996            .try_read()
997            .ok()
998            .and_then(|servers| servers.get(server_name).map(|s| s.process.state))
999            .unwrap_or(ServerState::Stopped)
1000    }
1001
1002    fn get_process(&self, server_name: &str) -> Option<ServerProcess> {
1003        self.servers
1004            .try_read()
1005            .ok()
1006            .and_then(|servers| servers.get(server_name).map(|s| s.process.clone()))
1007    }
1008
1009    fn get_all_processes(&self) -> Vec<ServerProcess> {
1010        self.servers
1011            .try_read()
1012            .map(|servers| servers.values().map(|s| s.process.clone()).collect())
1013            .unwrap_or_default()
1014    }
1015
1016    fn is_running(&self, server_name: &str) -> bool {
1017        self.get_state(server_name) == ServerState::Running
1018    }
1019
1020    fn get_running_servers(&self) -> Vec<String> {
1021        self.servers
1022            .try_read()
1023            .map(|servers| {
1024                servers
1025                    .iter()
1026                    .filter(|(_, s)| s.process.state == ServerState::Running)
1027                    .map(|(name, _)| name.clone())
1028                    .collect()
1029            })
1030            .unwrap_or_default()
1031    }
1032
1033    fn subscribe(&self) -> mpsc::Receiver<LifecycleEvent> {
1034        let (tx, rx) = mpsc::channel(100);
1035        let event_tx = self.event_tx.clone();
1036        tokio::spawn(async move {
1037            *event_tx.lock().await = Some(tx);
1038        });
1039        rx
1040    }
1041
1042    async fn cleanup(&self) -> McpResult<()> {
1043        // Stop all servers
1044        self.stop_all(true).await?;
1045
1046        // Clear all servers
1047        let mut servers = self.servers.write().await;
1048        servers.clear();
1049
1050        Ok(())
1051    }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    use super::*;
1057    use std::time::Duration;
1058
1059    fn create_test_config() -> McpServerConfig {
1060        McpServerConfig {
1061            transport_type: TransportType::Stdio,
1062            command: Some("echo".to_string()),
1063            args: Some(vec!["hello".to_string()]),
1064            env: None,
1065            url: None,
1066            headers: None,
1067            enabled: true,
1068            timeout: Duration::from_secs(30),
1069            retries: 3,
1070            auto_approve: vec![],
1071            log_level: Default::default(),
1072        }
1073    }
1074
1075    #[test]
1076    fn test_lifecycle_manager_new() {
1077        let manager = McpLifecycleManager::new();
1078        assert!(manager.get_all_processes().is_empty());
1079    }
1080
1081    #[test]
1082    fn test_lifecycle_manager_with_options() {
1083        let options = LifecycleOptions {
1084            startup_timeout: Duration::from_secs(60),
1085            max_restarts: 5,
1086            ..Default::default()
1087        };
1088        let manager = McpLifecycleManager::with_options(options);
1089        assert_eq!(manager.options.startup_timeout, Duration::from_secs(60));
1090        assert_eq!(manager.options.max_restarts, 5);
1091    }
1092
1093    #[test]
1094    fn test_calculate_restart_delay() {
1095        let manager = McpLifecycleManager::new();
1096
1097        let delay0 = manager.calculate_restart_delay(0);
1098        let delay1 = manager.calculate_restart_delay(1);
1099        let delay2 = manager.calculate_restart_delay(2);
1100
1101        // Each delay should be roughly double the previous
1102        assert!(delay1 > delay0);
1103        assert!(delay2 > delay1);
1104
1105        // Should not exceed 60 seconds
1106        let delay_max = manager.calculate_restart_delay(100);
1107        assert!(delay_max <= Duration::from_secs(60));
1108    }
1109
1110    #[test]
1111    fn test_server_state_default() {
1112        let process = ServerProcess::new("test".to_string());
1113        assert_eq!(process.state, ServerState::Stopped);
1114        assert_eq!(process.restart_count, 0);
1115        assert!(process.pid.is_none());
1116    }
1117
1118    #[test]
1119    fn test_start_options_default() {
1120        let options = StartOptions::default();
1121        assert!(!options.force);
1122        assert!(!options.wait_for_ready);
1123        assert!(options.dependencies.is_empty());
1124    }
1125
1126    #[test]
1127    fn test_stop_options_default() {
1128        let options = StopOptions::default();
1129        assert!(!options.force);
1130        assert!(options.reason.is_none());
1131    }
1132
1133    #[test]
1134    fn test_lifecycle_options_default() {
1135        let options = LifecycleOptions::default();
1136        assert_eq!(options.startup_timeout, Duration::from_secs(30));
1137        assert_eq!(options.shutdown_timeout, Duration::from_secs(10));
1138        assert_eq!(options.max_restarts, 3);
1139    }
1140
1141    #[tokio::test]
1142    async fn test_register_and_get_process() {
1143        let manager = McpLifecycleManager::new();
1144        let config = create_test_config();
1145
1146        manager.register_server("test-server", config);
1147
1148        // Wait for async registration
1149        tokio::time::sleep(Duration::from_millis(50)).await;
1150
1151        let process = manager.get_process("test-server");
1152        assert!(process.is_some());
1153        assert_eq!(process.unwrap().name, "test-server");
1154    }
1155
1156    #[tokio::test]
1157    async fn test_get_state_unregistered() {
1158        let manager = McpLifecycleManager::new();
1159        let state = manager.get_state("nonexistent");
1160        assert_eq!(state, ServerState::Stopped);
1161    }
1162
1163    #[tokio::test]
1164    async fn test_is_running_not_started() {
1165        let manager = McpLifecycleManager::new();
1166        let config = create_test_config();
1167
1168        manager.register_server("test-server", config);
1169        tokio::time::sleep(Duration::from_millis(50)).await;
1170
1171        assert!(!manager.is_running("test-server"));
1172    }
1173
1174    #[tokio::test]
1175    async fn test_get_running_servers_empty() {
1176        let manager = McpLifecycleManager::new();
1177        let running = manager.get_running_servers();
1178        assert!(running.is_empty());
1179    }
1180
1181    #[tokio::test]
1182    async fn test_set_dependencies() {
1183        let manager = McpLifecycleManager::new();
1184        let config = create_test_config();
1185
1186        manager.register_server("server-a", config.clone());
1187        manager.register_server("server-b", config);
1188        tokio::time::sleep(Duration::from_millis(50)).await;
1189
1190        manager.set_dependencies("server-b", vec!["server-a".to_string()]);
1191        tokio::time::sleep(Duration::from_millis(50)).await;
1192
1193        // Verify dependencies are set
1194        let servers = manager.servers.read().await;
1195        let server_b = servers.get("server-b").unwrap();
1196        assert_eq!(server_b.dependencies, vec!["server-a".to_string()]);
1197    }
1198
1199    #[tokio::test]
1200    async fn test_topological_sort() {
1201        let manager = McpLifecycleManager::new();
1202        let config = create_test_config();
1203
1204        // Register servers
1205        manager.register_server("server-a", config.clone());
1206        manager.register_server("server-b", config.clone());
1207        manager.register_server("server-c", config);
1208        tokio::time::sleep(Duration::from_millis(50)).await;
1209
1210        // Set dependencies: c depends on b, b depends on a
1211        manager.set_dependencies("server-c", vec!["server-b".to_string()]);
1212        manager.set_dependencies("server-b", vec!["server-a".to_string()]);
1213        tokio::time::sleep(Duration::from_millis(50)).await;
1214
1215        let servers = manager.servers.read().await;
1216        let sorted = manager.topological_sort(&servers);
1217
1218        // a should come before b, b should come before c
1219        let pos_a = sorted.iter().position(|x| x == "server-a").unwrap();
1220        let pos_b = sorted.iter().position(|x| x == "server-b").unwrap();
1221        let pos_c = sorted.iter().position(|x| x == "server-c").unwrap();
1222
1223        assert!(pos_a < pos_b);
1224        assert!(pos_b < pos_c);
1225    }
1226
1227    #[tokio::test]
1228    async fn test_unregister_server() {
1229        let manager = McpLifecycleManager::new();
1230        let config = create_test_config();
1231
1232        manager.register_server("test-server", config);
1233        tokio::time::sleep(Duration::from_millis(50)).await;
1234
1235        assert!(manager.get_process("test-server").is_some());
1236
1237        manager.unregister_server("test-server").await.unwrap();
1238
1239        assert!(manager.get_process("test-server").is_none());
1240    }
1241
1242    #[tokio::test]
1243    async fn test_cleanup() {
1244        let manager = McpLifecycleManager::new();
1245        let config = create_test_config();
1246
1247        manager.register_server("server-1", config.clone());
1248        manager.register_server("server-2", config);
1249        tokio::time::sleep(Duration::from_millis(50)).await;
1250
1251        assert_eq!(manager.get_all_processes().len(), 2);
1252
1253        manager.cleanup().await.unwrap();
1254
1255        assert!(manager.get_all_processes().is_empty());
1256    }
1257
1258    #[tokio::test]
1259    async fn test_health_check_not_running() {
1260        let manager = McpLifecycleManager::new();
1261        let config = create_test_config();
1262
1263        manager.register_server("test-server", config);
1264        tokio::time::sleep(Duration::from_millis(50)).await;
1265
1266        let result = manager.health_check("test-server").await;
1267        assert!(!result.healthy);
1268        assert!(result.error.is_some());
1269    }
1270
1271    #[tokio::test]
1272    async fn test_health_check_nonexistent() {
1273        let manager = McpLifecycleManager::new();
1274
1275        let result = manager.health_check("nonexistent").await;
1276        assert!(!result.healthy);
1277        assert!(result.error.unwrap().contains("not found"));
1278    }
1279
1280    #[tokio::test]
1281    async fn test_subscribe_events() {
1282        let manager = McpLifecycleManager::new();
1283        let _rx = manager.subscribe();
1284
1285        // Just verify subscription works without panic
1286        tokio::time::sleep(Duration::from_millis(50)).await;
1287    }
1288}