mcp_runner/server/
monitor.rs1use crate::error::{Error, Result};
2use crate::server::lifecycle::{ServerLifecycleEvent, ServerLifecycleManager};
3use crate::server::{ServerId, ServerStatus};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use tokio::task::JoinHandle;
8use tokio::time;
9use tracing;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ServerHealth {
14 Healthy,
16 Degraded,
18 Unhealthy,
20 Unknown,
22}
23
24#[derive(Debug, Clone)]
26pub struct ServerMonitorConfig {
27 pub check_interval: Duration,
29 pub health_check_timeout: Duration,
31 pub max_consecutive_failures: u32,
33 pub auto_restart: bool,
35}
36
37impl Default for ServerMonitorConfig {
38 fn default() -> Self {
39 Self {
40 check_interval: Duration::from_secs(30),
41 health_check_timeout: Duration::from_secs(5),
42 max_consecutive_failures: 3,
43 auto_restart: false,
44 }
45 }
46}
47
48pub struct ServerMonitor {
54 lifecycle_manager: Arc<ServerLifecycleManager>,
56 health_statuses: Arc<Mutex<HashMap<ServerId, ServerHealth>>>,
58 failure_counts: Arc<Mutex<HashMap<ServerId, u32>>>,
60 last_checked: Arc<Mutex<HashMap<ServerId, Instant>>>,
62 config: ServerMonitorConfig,
64 monitor_task: Option<JoinHandle<()>>,
66 running: Arc<Mutex<bool>>,
68}
69
70impl ServerMonitor {
71 #[tracing::instrument(skip(lifecycle_manager, config))]
75 pub fn new(
76 lifecycle_manager: Arc<ServerLifecycleManager>,
77 config: ServerMonitorConfig,
78 ) -> Self {
79 tracing::info!(config = ?config, "Creating new ServerMonitor");
80 Self {
81 lifecycle_manager,
82 health_statuses: Arc::new(Mutex::new(HashMap::new())),
83 failure_counts: Arc::new(Mutex::new(HashMap::new())),
84 last_checked: Arc::new(Mutex::new(HashMap::new())),
85 config,
86 monitor_task: None,
87 running: Arc::new(Mutex::new(false)),
88 }
89 }
90
91 #[tracing::instrument(skip(self))]
95 pub fn start(&mut self) -> Result<()> {
96 {
97 let mut running = self.running.lock().map_err(|_| {
98 tracing::error!("Failed to lock running flag");
99 Error::Other("Failed to lock running flag".to_string())
100 })?;
101
102 if *running {
103 tracing::debug!("Monitor already running");
104 return Ok(());
105 }
106 tracing::info!("Starting monitor task");
107 *running = true;
108 }
109
110 let lifecycle_manager = Arc::clone(&self.lifecycle_manager);
111 let health_statuses = Arc::clone(&self.health_statuses);
112 let failure_counts = Arc::clone(&self.failure_counts);
113 let last_checked = Arc::clone(&self.last_checked);
114 let running = Arc::clone(&self.running);
115 let config = self.config.clone();
116
117 let task = tokio::spawn(async move {
118 tracing::info!("Monitor loop started");
119 let mut interval = time::interval(config.check_interval);
120
121 loop {
122 interval.tick().await;
123
124 {
126 let running_guard = running.lock().unwrap();
127 if !*running_guard {
128 tracing::info!("Monitor loop stopping");
129 break;
130 }
131 }
132 tracing::debug!("Running health check cycle");
133
134 let server_ids_to_check = {
136 let health_guard = health_statuses.lock().unwrap();
137 health_guard.keys().cloned().collect::<Vec<_>>()
138 };
139 tracing::trace!(servers = ?server_ids_to_check, "Checking health for servers");
140
141 for server_id in server_ids_to_check {
143 let check_span =
144 tracing::info_span!("server_health_check", server_id = %server_id);
145 let _check_guard = check_span.enter();
146
147 tracing::debug!("Checking server health");
148 {
150 let mut checked = last_checked.lock().unwrap();
151 checked.insert(server_id, Instant::now());
152 }
153
154 match lifecycle_manager.get_status(server_id) {
156 Ok(status) => {
157 tracing::debug!(current_status = ?status, "Got server status");
158 let health = match status {
159 ServerStatus::Running => ServerHealth::Healthy,
160 ServerStatus::Failed => ServerHealth::Unhealthy,
161 _ => ServerHealth::Unknown,
162 };
163 tracing::info!(health = ?health, "Determined server health");
164
165 {
166 let mut statuses = health_statuses.lock().unwrap();
167 statuses.insert(server_id, health);
168 }
169
170 {
171 let mut counts = failure_counts.lock().unwrap();
172 if health == ServerHealth::Unhealthy {
173 let count = counts.entry(server_id).or_insert(0);
174 *count += 1;
175 tracing::warn!(
176 failure_count = *count,
177 "Server health check failed"
178 );
179
180 if config.auto_restart
181 && *count >= config.max_consecutive_failures
182 {
183 tracing::warn!(
184 threshold = config.max_consecutive_failures,
185 "Failure threshold reached, attempting auto-restart"
186 );
187 *count = 0;
188 tracing::info!(
189 "Auto-restart triggered (logic not implemented)"
190 );
191 }
192 } else {
193 if counts.contains_key(&server_id)
194 && *counts.get(&server_id).unwrap() > 0
195 {
196 tracing::info!("Resetting failure count");
197 counts.insert(server_id, 0);
198 }
199 }
200 }
201 }
202 Err(e) => {
203 tracing::error!(error = %e, "Failed to get server status during health check");
204 }
205 }
206 }
207 }
208 tracing::info!("Monitor loop finished");
209 });
210
211 self.monitor_task = Some(task);
212
213 Ok(())
214 }
215
216 #[tracing::instrument(skip(self))]
220 pub fn stop(&mut self) -> Result<()> {
221 {
222 let mut running = self.running.lock().map_err(|_| {
223 tracing::error!("Failed to lock running flag");
224 Error::Other("Failed to lock running flag".to_string())
225 })?;
226
227 if !*running {
228 tracing::debug!("Monitor already stopped");
229 return Ok(());
230 }
231 tracing::info!("Stopping monitor task");
232 *running = false;
233 }
234
235 if let Some(task) = self.monitor_task.take() {
236 tracing::debug!("Aborting monitor task handle");
237 task.abort();
238 }
239
240 Ok(())
241 }
242
243 #[tracing::instrument(skip(self), fields(server_id = %id))]
247 pub fn get_health(&self, id: ServerId) -> Result<ServerHealth> {
248 tracing::debug!("Getting server health status");
249 let health_statuses = self.health_statuses.lock().map_err(|_| {
250 tracing::error!("Failed to lock health statuses");
251 Error::Other("Failed to lock health statuses".to_string())
252 })?;
253
254 health_statuses.get(&id).copied().ok_or_else(|| {
255 tracing::warn!("Health status requested for unknown server");
256 Error::ServerNotFound(format!("{:?}", id))
257 })
258 }
259
260 #[tracing::instrument(skip(self), fields(server_id = %id, server_name = %name))]
265 pub async fn check_health(&self, id: ServerId, name: &str) -> Result<ServerHealth> {
266 tracing::info!("Forcing health check");
267
268 {
269 let mut last_checked = self
270 .last_checked
271 .lock()
272 .map_err(|_| Error::Other("Failed to lock last checked times".to_string()))?;
273 last_checked.insert(id, Instant::now());
274 }
275
276 let status = self.lifecycle_manager.get_status(id)?;
277 tracing::debug!(current_status = ?status, "Got server status for forced check");
278
279 let health = match status {
280 ServerStatus::Running => ServerHealth::Healthy,
281 ServerStatus::Starting => ServerHealth::Unknown,
282 ServerStatus::Stopping => ServerHealth::Unknown,
283 ServerStatus::Stopped => ServerHealth::Unknown,
284 ServerStatus::Failed => ServerHealth::Unhealthy,
285 };
286 tracing::info!(health = ?health, "Determined server health from forced check");
287
288 {
289 let mut health_statuses = self
290 .health_statuses
291 .lock()
292 .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
293 health_statuses.insert(id, health);
294 }
295
296 {
297 let mut failure_counts = self
298 .failure_counts
299 .lock()
300 .map_err(|_| Error::Other("Failed to lock failure counts".to_string()))?;
301
302 if health == ServerHealth::Unhealthy {
303 let count = failure_counts.entry(id).or_insert(0);
304 *count += 1;
305 tracing::warn!(
306 failure_count = *count,
307 "Server health check failed (forced)"
308 );
309
310 if self.config.auto_restart && *count >= self.config.max_consecutive_failures {
311 tracing::warn!(
312 threshold = self.config.max_consecutive_failures,
313 "Failure threshold reached (forced), attempting auto-restart"
314 );
315 *count = 0;
316
317 self.lifecycle_manager.record_event(
318 id,
319 name.to_string(),
320 ServerLifecycleEvent::Restarted,
321 Some("Auto-restart after consecutive failures (forced check)".to_string()),
322 )?;
323 tracing::info!(
324 "Auto-restart triggered by forced check (logic not implemented)"
325 );
326 }
327 } else {
328 if failure_counts.contains_key(&id) && *failure_counts.get(&id).unwrap() > 0 {
329 tracing::info!("Resetting failure count (forced check)");
330 failure_counts.insert(id, 0);
331 }
332 }
333 }
334
335 Ok(health)
336 }
337
338 pub fn get_all_health(&self) -> Result<HashMap<ServerId, ServerHealth>> {
340 let health_statuses = self
341 .health_statuses
342 .lock()
343 .map_err(|_| Error::Other("Failed to lock health statuses".to_string()))?;
344
345 Ok(health_statuses.clone())
346 }
347}