mcp_runner/server/
monitor.rs1use crate::error::{Error, Result};
2use crate::server::lifecycle::{ServerLifecycleEvent, ServerLifecycleManager};
3use crate::server::{ServerId, ServerStatus};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use tokio::task::JoinHandle;
8use tokio::time;
9use tracing;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ServerHealth {
17 Healthy,
19 Degraded,
21 Unhealthy,
23 Unknown,
25}
26
27#[derive(Debug, Clone)]
32pub struct ServerMonitorConfig {
33 pub check_interval: Duration,
35 pub health_check_timeout: Duration,
37 pub max_consecutive_failures: u32,
39 pub auto_restart: bool,
41}
42
43impl Default for ServerMonitorConfig {
44 fn default() -> Self {
45 Self {
46 check_interval: Duration::from_secs(30),
47 health_check_timeout: Duration::from_secs(5),
48 max_consecutive_failures: 3,
49 auto_restart: false,
50 }
51 }
52}
53
54pub struct ServerMonitor {
60 lifecycle_manager: Arc<ServerLifecycleManager>,
62 health_statuses: Arc<Mutex<HashMap<ServerId, ServerHealth>>>,
64 failure_counts: Arc<Mutex<HashMap<ServerId, u32>>>,
66 last_checked: Arc<Mutex<HashMap<ServerId, Instant>>>,
68 config: ServerMonitorConfig,
70 monitor_task: Option<JoinHandle<()>>,
72 running: Arc<Mutex<bool>>,
74}
75
76impl ServerMonitor {
77 #[tracing::instrument(skip(lifecycle_manager, config))]
81 pub fn new(
82 lifecycle_manager: Arc<ServerLifecycleManager>,
83 config: ServerMonitorConfig,
84 ) -> Self {
85 tracing::info!(config = ?config, "Creating new ServerMonitor");
86 Self {
87 lifecycle_manager,
88 health_statuses: Arc::new(Mutex::new(HashMap::new())),
89 failure_counts: Arc::new(Mutex::new(HashMap::new())),
90 last_checked: Arc::new(Mutex::new(HashMap::new())),
91 config,
92 monitor_task: None,
93 running: Arc::new(Mutex::new(false)),
94 }
95 }
96
97 #[tracing::instrument(skip(self))]
101 pub fn start(&mut self) -> Result<()> {
102 {
103 let mut running = self.running.lock().map_err(|_| {
104 tracing::error!("Failed to lock running flag");
105 Error::Other("Failed to lock running flag".to_string())
106 })?;
107
108 if *running {
109 tracing::debug!("Monitor already running");
110 return Ok(());
111 }
112 tracing::info!("Starting monitor task");
113 *running = true;
114 }
115
116 let lifecycle_manager = Arc::clone(&self.lifecycle_manager);
117 let health_statuses = Arc::clone(&self.health_statuses);
118 let failure_counts = Arc::clone(&self.failure_counts);
119 let last_checked = Arc::clone(&self.last_checked);
120 let running = Arc::clone(&self.running);
121 let config = self.config.clone();
122
123 let task = tokio::spawn(async move {
124 tracing::info!("Monitor loop started");
125 let mut interval = time::interval(config.check_interval);
126
127 loop {
128 interval.tick().await;
129
130 {
132 let running_guard = running.lock().unwrap();
133 if !*running_guard {
134 tracing::info!("Monitor loop stopping");
135 break;
136 }
137 }
138 tracing::debug!("Running health check cycle");
139
140 let server_ids_to_check = {
142 let health_guard = health_statuses.lock().unwrap();
143 health_guard.keys().cloned().collect::<Vec<_>>()
144 };
145 tracing::trace!(servers = ?server_ids_to_check, "Checking health for servers");
146
147 for server_id in server_ids_to_check {
149 let check_span =
150 tracing::info_span!("server_health_check", server_id = %server_id);
151 let _check_guard = check_span.enter();
152
153 tracing::debug!("Checking server health");
154 {
156 let mut checked = last_checked.lock().unwrap();
157 checked.insert(server_id, Instant::now());
158 }
159
160 match lifecycle_manager.get_status(server_id) {
162 Ok(status) => {
163 tracing::debug!(current_status = ?status, "Got server status");
164 let health = match status {
165 ServerStatus::Running => ServerHealth::Healthy,
166 ServerStatus::Failed => ServerHealth::Unhealthy,
167 _ => ServerHealth::Unknown,
168 };
169 tracing::info!(health = ?health, "Determined server health");
170
171 {
172 let mut statuses = health_statuses.lock().unwrap();
173 statuses.insert(server_id, health);
174 }
175
176 {
177 let mut counts = failure_counts.lock().unwrap();
178 if health == ServerHealth::Unhealthy {
179 let count = counts.entry(server_id).or_insert(0);
180 *count += 1;
181 tracing::warn!(
182 failure_count = *count,
183 "Server health check failed"
184 );
185
186 if config.auto_restart
187 && *count >= config.max_consecutive_failures
188 {
189 tracing::warn!(
190 threshold = config.max_consecutive_failures,
191 "Failure threshold reached, attempting auto-restart"
192 );
193 *count = 0;
194 tracing::info!(
195 "Auto-restart triggered (logic not implemented)"
196 );
197 }
198 } else if counts.contains_key(&server_id)
199 && *counts.get(&server_id).unwrap() > 0
200 {
201 tracing::info!("Resetting failure count");
202 counts.insert(server_id, 0);
203 }
204 }
205 }
206 Err(e) => {
207 tracing::error!(error = %e, "Failed to get server status during health check");
208 }
209 }
210 }
211 }
212 tracing::info!("Monitor loop finished");
213 });
214
215 self.monitor_task = Some(task);
216
217 Ok(())
218 }
219
220 #[tracing::instrument(skip(self))]
224 pub fn stop(&mut self) -> Result<()> {
225 {
226 let mut running = self.running.lock().map_err(|_| {
227 tracing::error!("Failed to lock running flag");
228 Error::Other("Failed to lock running flag".to_string())
229 })?;
230
231 if !*running {
232 tracing::debug!("Monitor already stopped");
233 return Ok(());
234 }
235 tracing::info!("Stopping monitor task");
236 *running = false;
237 }
238
239 if let Some(task) = self.monitor_task.take() {
240 tracing::debug!("Aborting monitor task handle");
241 task.abort();
242 }
243
244 Ok(())
245 }
246
247 #[tracing::instrument(skip(self), fields(server_id = %id))]
251 pub fn get_health(&self, id: ServerId) -> Result<ServerHealth> {
252 tracing::debug!("Getting server health status");
253 let health_statuses = self.health_statuses.lock().map_err(|_| {
254 tracing::error!("Failed to lock health statuses");
255 Error::Other("Failed to lock health statuses".to_string())
256 })?;
257
258 health_statuses.get(&id).copied().ok_or_else(|| {
259 tracing::warn!("Health status requested for unknown server");
260 Error::ServerNotFound(format!("{:?}", id))
261 })
262 }
263
264 #[tracing::instrument(skip(self), fields(server_id = %id, server_name = %name))]
269 pub async fn check_health(&self, id: ServerId, name: &str) -> Result<ServerHealth> {
270 tracing::info!("Forcing health check");
271
272 {
273 let mut last_checked = self
274 .last_checked
275 .lock()
276 .map_err(|_| Error::Other("Failed to lock last checked times".to_string()))?;
277 last_checked.insert(id, Instant::now());
278 }
279
280 let status = self.lifecycle_manager.get_status(id)?;
281 tracing::debug!(current_status = ?status, "Got server status for forced check");
282
283 let health = match status {
284 ServerStatus::Running => ServerHealth::Healthy,
285 ServerStatus::Starting => ServerHealth::Unknown,
286 ServerStatus::Stopping => ServerHealth::Unknown,
287 ServerStatus::Stopped => ServerHealth::Unknown,
288 ServerStatus::Failed => ServerHealth::Unhealthy,
289 };
290 tracing::info!(health = ?health, "Determined server health from forced check");
291
292 {
293 let mut health_statuses = self
294 .health_statuses
295 .lock()
296 .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
297 health_statuses.insert(id, health);
298 }
299
300 {
301 let mut failure_counts = self
302 .failure_counts
303 .lock()
304 .map_err(|_| Error::Other("Failed to lock failure counts".to_string()))?;
305
306 if health == ServerHealth::Unhealthy {
307 let count = failure_counts.entry(id).or_insert(0);
308 *count += 1;
309 tracing::warn!(
310 failure_count = *count,
311 "Server health check failed (forced)"
312 );
313
314 if self.config.auto_restart && *count >= self.config.max_consecutive_failures {
315 tracing::warn!(
316 threshold = self.config.max_consecutive_failures,
317 "Failure threshold reached (forced), attempting auto-restart"
318 );
319 *count = 0;
320
321 self.lifecycle_manager.record_event(
322 id,
323 name.to_string(),
324 ServerLifecycleEvent::Restarted,
325 Some("Auto-restart after consecutive failures (forced check)".to_string()),
326 )?;
327 tracing::info!(
328 "Auto-restart triggered by forced check (logic not implemented)"
329 );
330 }
331 } else if failure_counts.contains_key(&id) && *failure_counts.get(&id).unwrap() > 0 {
332 tracing::info!("Resetting failure count (forced check)");
333 failure_counts.insert(id, 0);
334 }
335 }
336
337 Ok(health)
338 }
339
340 pub fn get_all_health(&self) -> Result<HashMap<ServerId, ServerHealth>> {
342 let health_statuses = self
343 .health_statuses
344 .lock()
345 .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
346
347 Ok(health_statuses.clone())
348 }
349}