mcp_runner/server/
monitor.rs

1use crate::error::{Error, Result};
2use crate::server::lifecycle::{ServerLifecycleEvent, ServerLifecycleManager};
3use crate::server::{ServerId, ServerStatus};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use tokio::task::JoinHandle;
8use tokio::time;
9use tracing;
10
11/// Server health status
12///
13/// Represents the health state of a server based on monitoring checks.
14/// This is used by the ServerMonitor to track and report on server health.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ServerHealth {
17    /// Server is healthy and operating normally
18    Healthy,
19    /// Server is operational but performing sub-optimally
20    Degraded,
21    /// Server is not functioning correctly
22    Unhealthy,
23    /// Server health could not be determined
24    Unknown,
25}
26
27/// Server monitor configuration
28///
29/// Defines the parameters used by the ServerMonitor to determine how and when
30/// to check server health, and what actions to take based on health status.
31#[derive(Debug, Clone)]
32pub struct ServerMonitorConfig {
33    /// How often to check server health (interval between checks)
34    pub check_interval: Duration,
35    /// Maximum time to wait for a health check to complete
36    pub health_check_timeout: Duration,
37    /// Number of consecutive failed checks before marking a server as unhealthy
38    pub max_consecutive_failures: u32,
39    /// Whether to automatically restart unhealthy servers
40    pub auto_restart: bool,
41}
42
43impl Default for ServerMonitorConfig {
44    fn default() -> Self {
45        Self {
46            check_interval: Duration::from_secs(30),
47            health_check_timeout: Duration::from_secs(5),
48            max_consecutive_failures: 3,
49            auto_restart: false,
50        }
51    }
52}
53
54/// Monitors the health of running MCP servers.
55///
56/// Periodically checks the status of registered servers and can optionally
57/// trigger restarts based on configuration.
58/// All public methods are instrumented with `tracing` spans.
59pub struct ServerMonitor {
60    /// Lifecycle manager
61    lifecycle_manager: Arc<ServerLifecycleManager>,
62    /// Server health statuses
63    health_statuses: Arc<Mutex<HashMap<ServerId, ServerHealth>>>,
64    /// Server failure counts
65    failure_counts: Arc<Mutex<HashMap<ServerId, u32>>>,
66    /// Server last checked times
67    last_checked: Arc<Mutex<HashMap<ServerId, Instant>>>,
68    /// Monitor configuration
69    config: ServerMonitorConfig,
70    /// Monitor task
71    monitor_task: Option<JoinHandle<()>>,
72    /// Running flag
73    running: Arc<Mutex<bool>>,
74}
75
76impl ServerMonitor {
77    /// Create a new server monitor
78    ///
79    /// This method is instrumented with `tracing`.
80    #[tracing::instrument(skip(lifecycle_manager, config))]
81    pub fn new(
82        lifecycle_manager: Arc<ServerLifecycleManager>,
83        config: ServerMonitorConfig,
84    ) -> Self {
85        tracing::info!(config = ?config, "Creating new ServerMonitor");
86        Self {
87            lifecycle_manager,
88            health_statuses: Arc::new(Mutex::new(HashMap::new())),
89            failure_counts: Arc::new(Mutex::new(HashMap::new())),
90            last_checked: Arc::new(Mutex::new(HashMap::new())),
91            config,
92            monitor_task: None,
93            running: Arc::new(Mutex::new(false)),
94        }
95    }
96
97    /// Start the monitor
98    ///
99    /// This method is instrumented with `tracing`.
100    #[tracing::instrument(skip(self))]
101    pub fn start(&mut self) -> Result<()> {
102        {
103            let mut running = self.running.lock().map_err(|_| {
104                tracing::error!("Failed to lock running flag");
105                Error::Other("Failed to lock running flag".to_string())
106            })?;
107
108            if *running {
109                tracing::debug!("Monitor already running");
110                return Ok(());
111            }
112            tracing::info!("Starting monitor task");
113            *running = true;
114        }
115
116        let lifecycle_manager = Arc::clone(&self.lifecycle_manager);
117        let health_statuses = Arc::clone(&self.health_statuses);
118        let failure_counts = Arc::clone(&self.failure_counts);
119        let last_checked = Arc::clone(&self.last_checked);
120        let running = Arc::clone(&self.running);
121        let config = self.config.clone();
122
123        let task = tokio::spawn(async move {
124            tracing::info!("Monitor loop started");
125            let mut interval = time::interval(config.check_interval);
126
127            loop {
128                interval.tick().await;
129
130                // Check if we should stop
131                {
132                    let running_guard = running.lock().unwrap();
133                    if !*running_guard {
134                        tracing::info!("Monitor loop stopping");
135                        break;
136                    }
137                }
138                tracing::debug!("Running health check cycle");
139
140                // Get a snapshot of servers we need to check
141                let server_ids_to_check = {
142                    let health_guard = health_statuses.lock().unwrap();
143                    health_guard.keys().cloned().collect::<Vec<_>>()
144                };
145                tracing::trace!(servers = ?server_ids_to_check, "Checking health for servers");
146
147                // Check each server
148                for server_id in server_ids_to_check {
149                    let check_span =
150                        tracing::info_span!("server_health_check", server_id = %server_id);
151                    let _check_guard = check_span.enter();
152
153                    tracing::debug!("Checking server health");
154                    // Record check time
155                    {
156                        let mut checked = last_checked.lock().unwrap();
157                        checked.insert(server_id, Instant::now());
158                    }
159
160                    // Get current server status
161                    match lifecycle_manager.get_status(server_id) {
162                        Ok(status) => {
163                            tracing::debug!(current_status = ?status, "Got server status");
164                            let health = match status {
165                                ServerStatus::Running => ServerHealth::Healthy,
166                                ServerStatus::Failed => ServerHealth::Unhealthy,
167                                _ => ServerHealth::Unknown,
168                            };
169                            tracing::info!(health = ?health, "Determined server health");
170
171                            {
172                                let mut statuses = health_statuses.lock().unwrap();
173                                statuses.insert(server_id, health);
174                            }
175
176                            {
177                                let mut counts = failure_counts.lock().unwrap();
178                                if health == ServerHealth::Unhealthy {
179                                    let count = counts.entry(server_id).or_insert(0);
180                                    *count += 1;
181                                    tracing::warn!(
182                                        failure_count = *count,
183                                        "Server health check failed"
184                                    );
185
186                                    if config.auto_restart
187                                        && *count >= config.max_consecutive_failures
188                                    {
189                                        tracing::warn!(
190                                            threshold = config.max_consecutive_failures,
191                                            "Failure threshold reached, attempting auto-restart"
192                                        );
193                                        *count = 0;
194                                        tracing::info!(
195                                            "Auto-restart triggered (logic not implemented)"
196                                        );
197                                    }
198                                } else if counts.contains_key(&server_id)
199                                    && *counts.get(&server_id).unwrap() > 0
200                                {
201                                    tracing::info!("Resetting failure count");
202                                    counts.insert(server_id, 0);
203                                }
204                            }
205                        }
206                        Err(e) => {
207                            tracing::error!(error = %e, "Failed to get server status during health check");
208                        }
209                    }
210                }
211            }
212            tracing::info!("Monitor loop finished");
213        });
214
215        self.monitor_task = Some(task);
216
217        Ok(())
218    }
219
220    /// Stop the monitor
221    ///
222    /// This method is instrumented with `tracing`.
223    #[tracing::instrument(skip(self))]
224    pub fn stop(&mut self) -> Result<()> {
225        {
226            let mut running = self.running.lock().map_err(|_| {
227                tracing::error!("Failed to lock running flag");
228                Error::Other("Failed to lock running flag".to_string())
229            })?;
230
231            if !*running {
232                tracing::debug!("Monitor already stopped");
233                return Ok(());
234            }
235            tracing::info!("Stopping monitor task");
236            *running = false;
237        }
238
239        if let Some(task) = self.monitor_task.take() {
240            tracing::debug!("Aborting monitor task handle");
241            task.abort();
242        }
243
244        Ok(())
245    }
246
247    /// Get server health
248    ///
249    /// This method is instrumented with `tracing`.
250    #[tracing::instrument(skip(self), fields(server_id = %id))]
251    pub fn get_health(&self, id: ServerId) -> Result<ServerHealth> {
252        tracing::debug!("Getting server health status");
253        let health_statuses = self.health_statuses.lock().map_err(|_| {
254            tracing::error!("Failed to lock health statuses");
255            Error::Other("Failed to lock health statuses".to_string())
256        })?;
257
258        health_statuses.get(&id).copied().ok_or_else(|| {
259            tracing::warn!("Health status requested for unknown server");
260            Error::ServerNotFound(format!("{:?}", id))
261        })
262    }
263
264    /// Force health check for a server
265    ///
266    /// This method is instrumented with `tracing`.
267    // Note: This is a simplified version, real health checks might involve communication
268    #[tracing::instrument(skip(self), fields(server_id = %id, server_name = %name))]
269    pub async fn check_health(&self, id: ServerId, name: &str) -> Result<ServerHealth> {
270        tracing::info!("Forcing health check");
271
272        {
273            let mut last_checked = self
274                .last_checked
275                .lock()
276                .map_err(|_| Error::Other("Failed to lock last checked times".to_string()))?;
277            last_checked.insert(id, Instant::now());
278        }
279
280        let status = self.lifecycle_manager.get_status(id)?;
281        tracing::debug!(current_status = ?status, "Got server status for forced check");
282
283        let health = match status {
284            ServerStatus::Running => ServerHealth::Healthy,
285            ServerStatus::Starting => ServerHealth::Unknown,
286            ServerStatus::Stopping => ServerHealth::Unknown,
287            ServerStatus::Stopped => ServerHealth::Unknown,
288            ServerStatus::Failed => ServerHealth::Unhealthy,
289        };
290        tracing::info!(health = ?health, "Determined server health from forced check");
291
292        {
293            let mut health_statuses = self
294                .health_statuses
295                .lock()
296                .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
297            health_statuses.insert(id, health);
298        }
299
300        {
301            let mut failure_counts = self
302                .failure_counts
303                .lock()
304                .map_err(|_| Error::Other("Failed to lock failure counts".to_string()))?;
305
306            if health == ServerHealth::Unhealthy {
307                let count = failure_counts.entry(id).or_insert(0);
308                *count += 1;
309                tracing::warn!(
310                    failure_count = *count,
311                    "Server health check failed (forced)"
312                );
313
314                if self.config.auto_restart && *count >= self.config.max_consecutive_failures {
315                    tracing::warn!(
316                        threshold = self.config.max_consecutive_failures,
317                        "Failure threshold reached (forced), attempting auto-restart"
318                    );
319                    *count = 0;
320
321                    self.lifecycle_manager.record_event(
322                        id,
323                        name.to_string(),
324                        ServerLifecycleEvent::Restarted,
325                        Some("Auto-restart after consecutive failures (forced check)".to_string()),
326                    )?;
327                    tracing::info!(
328                        "Auto-restart triggered by forced check (logic not implemented)"
329                    );
330                }
331            } else if failure_counts.contains_key(&id) && *failure_counts.get(&id).unwrap() > 0 {
332                tracing::info!("Resetting failure count (forced check)");
333                failure_counts.insert(id, 0);
334            }
335        }
336
337        Ok(health)
338    }
339
340    /// Get all health statuses
341    pub fn get_all_health(&self) -> Result<HashMap<ServerId, ServerHealth>> {
342        let health_statuses = self
343            .health_statuses
344            .lock()
345            .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
346
347        Ok(health_statuses.clone())
348    }
349}