mcp_runner/server/
monitor.rs

1use crate::error::{Error, Result};
2use crate::server::lifecycle::{ServerLifecycleEvent, ServerLifecycleManager};
3use crate::server::{ServerId, ServerStatus};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use tokio::task::JoinHandle;
8use tokio::time;
9use tracing;
10
11/// Server health status
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ServerHealth {
14    /// Server is healthy
15    Healthy,
16    /// Server is degraded
17    Degraded,
18    /// Server is unhealthy
19    Unhealthy,
20    /// Server health is unknown
21    Unknown,
22}
23
24/// Server monitor configuration
25#[derive(Debug, Clone)]
26pub struct ServerMonitorConfig {
27    /// Check interval
28    pub check_interval: Duration,
29    /// Health check timeout
30    pub health_check_timeout: Duration,
31    /// Maximum number of consecutive failures before marking as unhealthy
32    pub max_consecutive_failures: u32,
33    /// Auto-restart unhealthy servers
34    pub auto_restart: bool,
35}
36
37impl Default for ServerMonitorConfig {
38    fn default() -> Self {
39        Self {
40            check_interval: Duration::from_secs(30),
41            health_check_timeout: Duration::from_secs(5),
42            max_consecutive_failures: 3,
43            auto_restart: false,
44        }
45    }
46}
47
48/// Monitors the health of running MCP servers.
49///
50/// Periodically checks the status of registered servers and can optionally
51/// trigger restarts based on configuration.
52/// All public methods are instrumented with `tracing` spans.
53pub struct ServerMonitor {
54    /// Lifecycle manager
55    lifecycle_manager: Arc<ServerLifecycleManager>,
56    /// Server health statuses
57    health_statuses: Arc<Mutex<HashMap<ServerId, ServerHealth>>>,
58    /// Server failure counts
59    failure_counts: Arc<Mutex<HashMap<ServerId, u32>>>,
60    /// Server last checked times
61    last_checked: Arc<Mutex<HashMap<ServerId, Instant>>>,
62    /// Monitor configuration
63    config: ServerMonitorConfig,
64    /// Monitor task
65    monitor_task: Option<JoinHandle<()>>,
66    /// Running flag
67    running: Arc<Mutex<bool>>,
68}
69
70impl ServerMonitor {
71    /// Create a new server monitor
72    ///
73    /// This method is instrumented with `tracing`.
74    #[tracing::instrument(skip(lifecycle_manager, config))]
75    pub fn new(
76        lifecycle_manager: Arc<ServerLifecycleManager>,
77        config: ServerMonitorConfig,
78    ) -> Self {
79        tracing::info!(config = ?config, "Creating new ServerMonitor");
80        Self {
81            lifecycle_manager,
82            health_statuses: Arc::new(Mutex::new(HashMap::new())),
83            failure_counts: Arc::new(Mutex::new(HashMap::new())),
84            last_checked: Arc::new(Mutex::new(HashMap::new())),
85            config,
86            monitor_task: None,
87            running: Arc::new(Mutex::new(false)),
88        }
89    }
90
91    /// Start the monitor
92    ///
93    /// This method is instrumented with `tracing`.
94    #[tracing::instrument(skip(self))]
95    pub fn start(&mut self) -> Result<()> {
96        {
97            let mut running = self.running.lock().map_err(|_| {
98                tracing::error!("Failed to lock running flag");
99                Error::Other("Failed to lock running flag".to_string())
100            })?;
101
102            if *running {
103                tracing::debug!("Monitor already running");
104                return Ok(());
105            }
106            tracing::info!("Starting monitor task");
107            *running = true;
108        }
109
110        let lifecycle_manager = Arc::clone(&self.lifecycle_manager);
111        let health_statuses = Arc::clone(&self.health_statuses);
112        let failure_counts = Arc::clone(&self.failure_counts);
113        let last_checked = Arc::clone(&self.last_checked);
114        let running = Arc::clone(&self.running);
115        let config = self.config.clone();
116
117        let task = tokio::spawn(async move {
118            tracing::info!("Monitor loop started");
119            let mut interval = time::interval(config.check_interval);
120
121            loop {
122                interval.tick().await;
123
124                // Check if we should stop
125                {
126                    let running_guard = running.lock().unwrap();
127                    if !*running_guard {
128                        tracing::info!("Monitor loop stopping");
129                        break;
130                    }
131                }
132                tracing::debug!("Running health check cycle");
133
134                // Get a snapshot of servers we need to check
135                let server_ids_to_check = {
136                    let health_guard = health_statuses.lock().unwrap();
137                    health_guard.keys().cloned().collect::<Vec<_>>()
138                };
139                tracing::trace!(servers = ?server_ids_to_check, "Checking health for servers");
140
141                // Check each server
142                for server_id in server_ids_to_check {
143                    let check_span =
144                        tracing::info_span!("server_health_check", server_id = %server_id);
145                    let _check_guard = check_span.enter();
146
147                    tracing::debug!("Checking server health");
148                    // Record check time
149                    {
150                        let mut checked = last_checked.lock().unwrap();
151                        checked.insert(server_id, Instant::now());
152                    }
153
154                    // Get current server status
155                    match lifecycle_manager.get_status(server_id) {
156                        Ok(status) => {
157                            tracing::debug!(current_status = ?status, "Got server status");
158                            let health = match status {
159                                ServerStatus::Running => ServerHealth::Healthy,
160                                ServerStatus::Failed => ServerHealth::Unhealthy,
161                                _ => ServerHealth::Unknown,
162                            };
163                            tracing::info!(health = ?health, "Determined server health");
164
165                            {
166                                let mut statuses = health_statuses.lock().unwrap();
167                                statuses.insert(server_id, health);
168                            }
169
170                            {
171                                let mut counts = failure_counts.lock().unwrap();
172                                if health == ServerHealth::Unhealthy {
173                                    let count = counts.entry(server_id).or_insert(0);
174                                    *count += 1;
175                                    tracing::warn!(
176                                        failure_count = *count,
177                                        "Server health check failed"
178                                    );
179
180                                    if config.auto_restart
181                                        && *count >= config.max_consecutive_failures
182                                    {
183                                        tracing::warn!(
184                                            threshold = config.max_consecutive_failures,
185                                            "Failure threshold reached, attempting auto-restart"
186                                        );
187                                        *count = 0;
188                                        tracing::info!(
189                                            "Auto-restart triggered (logic not implemented)"
190                                        );
191                                    }
192                                } else {
193                                    if counts.contains_key(&server_id)
194                                        && *counts.get(&server_id).unwrap() > 0
195                                    {
196                                        tracing::info!("Resetting failure count");
197                                        counts.insert(server_id, 0);
198                                    }
199                                }
200                            }
201                        }
202                        Err(e) => {
203                            tracing::error!(error = %e, "Failed to get server status during health check");
204                        }
205                    }
206                }
207            }
208            tracing::info!("Monitor loop finished");
209        });
210
211        self.monitor_task = Some(task);
212
213        Ok(())
214    }
215
216    /// Stop the monitor
217    ///
218    /// This method is instrumented with `tracing`.
219    #[tracing::instrument(skip(self))]
220    pub fn stop(&mut self) -> Result<()> {
221        {
222            let mut running = self.running.lock().map_err(|_| {
223                tracing::error!("Failed to lock running flag");
224                Error::Other("Failed to lock running flag".to_string())
225            })?;
226
227            if !*running {
228                tracing::debug!("Monitor already stopped");
229                return Ok(());
230            }
231            tracing::info!("Stopping monitor task");
232            *running = false;
233        }
234
235        if let Some(task) = self.monitor_task.take() {
236            tracing::debug!("Aborting monitor task handle");
237            task.abort();
238        }
239
240        Ok(())
241    }
242
243    /// Get server health
244    ///
245    /// This method is instrumented with `tracing`.
246    #[tracing::instrument(skip(self), fields(server_id = %id))]
247    pub fn get_health(&self, id: ServerId) -> Result<ServerHealth> {
248        tracing::debug!("Getting server health status");
249        let health_statuses = self.health_statuses.lock().map_err(|_| {
250            tracing::error!("Failed to lock health statuses");
251            Error::Other("Failed to lock health statuses".to_string())
252        })?;
253
254        health_statuses.get(&id).copied().ok_or_else(|| {
255            tracing::warn!("Health status requested for unknown server");
256            Error::ServerNotFound(format!("{:?}", id))
257        })
258    }
259
260    /// Force health check for a server
261    ///
262    /// This method is instrumented with `tracing`.
263    // Note: This is a simplified version, real health checks might involve communication
264    #[tracing::instrument(skip(self), fields(server_id = %id, server_name = %name))]
265    pub async fn check_health(&self, id: ServerId, name: &str) -> Result<ServerHealth> {
266        tracing::info!("Forcing health check");
267
268        {
269            let mut last_checked = self
270                .last_checked
271                .lock()
272                .map_err(|_| Error::Other("Failed to lock last checked times".to_string()))?;
273            last_checked.insert(id, Instant::now());
274        }
275
276        let status = self.lifecycle_manager.get_status(id)?;
277        tracing::debug!(current_status = ?status, "Got server status for forced check");
278
279        let health = match status {
280            ServerStatus::Running => ServerHealth::Healthy,
281            ServerStatus::Starting => ServerHealth::Unknown,
282            ServerStatus::Stopping => ServerHealth::Unknown,
283            ServerStatus::Stopped => ServerHealth::Unknown,
284            ServerStatus::Failed => ServerHealth::Unhealthy,
285        };
286        tracing::info!(health = ?health, "Determined server health from forced check");
287
288        {
289            let mut health_statuses = self
290                .health_statuses
291                .lock()
292                .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
293            health_statuses.insert(id, health);
294        }
295
296        {
297            let mut failure_counts = self
298                .failure_counts
299                .lock()
300                .map_err(|_| Error::Other("Failed to lock failure counts".to_string()))?;
301
302            if health == ServerHealth::Unhealthy {
303                let count = failure_counts.entry(id).or_insert(0);
304                *count += 1;
305                tracing::warn!(
306                    failure_count = *count,
307                    "Server health check failed (forced)"
308                );
309
310                if self.config.auto_restart && *count >= self.config.max_consecutive_failures {
311                    tracing::warn!(
312                        threshold = self.config.max_consecutive_failures,
313                        "Failure threshold reached (forced), attempting auto-restart"
314                    );
315                    *count = 0;
316
317                    self.lifecycle_manager.record_event(
318                        id,
319                        name.to_string(),
320                        ServerLifecycleEvent::Restarted,
321                        Some("Auto-restart after consecutive failures (forced check)".to_string()),
322                    )?;
323                    tracing::info!(
324                        "Auto-restart triggered by forced check (logic not implemented)"
325                    );
326                }
327            } else {
328                if failure_counts.contains_key(&id) && *failure_counts.get(&id).unwrap() > 0 {
329                    tracing::info!("Resetting failure count (forced check)");
330                    failure_counts.insert(id, 0);
331                }
332            }
333        }
334
335        Ok(health)
336    }
337
338    /// Get all health statuses
339    pub fn get_all_health(&self) -> Result<HashMap<ServerId, ServerHealth>> {
340        let health_statuses = self
341            .health_statuses
342            .lock()
343            .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
344
345        Ok(health_statuses.clone())
346    }
347}