1use std::collections::VecDeque;
32use std::path::PathBuf;
33use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
34use thiserror::Error;
35
36const MAX_HISTORY_SAMPLES: usize = 1000;
38
39#[derive(Debug, Error)]
41pub enum HealthMonitorError {
42 #[error("IO error during health check: {0}")]
43 IoError(#[from] std::io::Error),
44
45 #[error("Failed to read disk metrics: {0}")]
46 MetricsError(String),
47
48 #[error("Insufficient data for prediction (need at least {required} samples, have {actual})")]
49 InsufficientData { required: usize, actual: usize },
50}
51
52#[derive(Debug, Clone)]
54pub struct HealthConfig {
55 pub check_interval: Duration,
57 pub latency_warning_threshold_ms: u64,
59 pub latency_critical_threshold_ms: u64,
61 pub min_free_space_percent: f64,
63 pub critical_failure_count: usize,
65 pub enable_prediction: bool,
67 pub failure_risk_threshold: f64,
69}
70
71impl Default for HealthConfig {
72 fn default() -> Self {
73 Self {
74 check_interval: Duration::from_secs(60),
75 latency_warning_threshold_ms: 100,
76 latency_critical_threshold_ms: 500,
77 min_free_space_percent: 10.0,
78 critical_failure_count: 3,
79 enable_prediction: true,
80 failure_risk_threshold: 0.7,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum PredictiveHealthStatus {
88 Healthy,
90 Warning,
92 Critical,
94 Unknown,
96}
97
98#[derive(Debug, Clone)]
100pub struct PredictiveHealthReport {
101 pub overall_status: PredictiveHealthStatus,
103 pub timestamp: u64,
105 pub avg_read_latency_ms: f64,
107 pub avg_write_latency_ms: f64,
109 pub recent_io_errors: usize,
111 pub available_bytes: u64,
113 pub total_bytes: u64,
115 pub free_space_percent: f64,
117 pub failure_risk_score: f64,
119 pub warnings: Vec<String>,
121}
122
123#[derive(Debug, Clone)]
125#[allow(dead_code)]
126struct MetricSample {
127 timestamp: u64,
128 read_latency_ms: f64,
129 write_latency_ms: f64,
130 io_errors: usize,
131 free_space_bytes: u64,
132}
133
134pub struct PredictiveStorageMonitor {
136 storage_path: PathBuf,
137 config: HealthConfig,
138 history: VecDeque<MetricSample>,
139 consecutive_failures: usize,
140 last_check: Option<Instant>,
141 io_error_count: usize,
142 total_checks: usize,
143}
144
145impl PredictiveStorageMonitor {
146 pub fn new(storage_path: PathBuf, config: HealthConfig) -> Self {
148 Self {
149 storage_path,
150 config,
151 history: VecDeque::with_capacity(MAX_HISTORY_SAMPLES),
152 consecutive_failures: 0,
153 last_check: None,
154 io_error_count: 0,
155 total_checks: 0,
156 }
157 }
158
159 pub async fn check_health(&mut self) -> Result<PredictiveHealthReport, HealthMonitorError> {
161 self.total_checks += 1;
162 self.last_check = Some(Instant::now());
163
164 let (read_latency, write_latency) = self.measure_latency().await?;
166 let (available_bytes, total_bytes) = self.get_space_info()?;
167 let free_space_percent = (available_bytes as f64 / total_bytes as f64) * 100.0;
168
169 let sample = MetricSample {
171 timestamp: current_timestamp(),
172 read_latency_ms: read_latency,
173 write_latency_ms: write_latency,
174 io_errors: self.io_error_count,
175 free_space_bytes: available_bytes,
176 };
177
178 if self.history.len() >= MAX_HISTORY_SAMPLES {
179 self.history.pop_front();
180 }
181 self.history.push_back(sample);
182
183 let avg_read_latency = self.calculate_avg_read_latency();
185 let avg_write_latency = self.calculate_avg_write_latency();
186
187 let mut warnings = Vec::new();
189 let mut status = PredictiveHealthStatus::Healthy;
190
191 if avg_read_latency > self.config.latency_critical_threshold_ms as f64
193 || avg_write_latency > self.config.latency_critical_threshold_ms as f64
194 {
195 status = PredictiveHealthStatus::Critical;
196 warnings.push(format!(
197 "Critical latency: read={:.2}ms, write={:.2}ms",
198 avg_read_latency, avg_write_latency
199 ));
200 self.consecutive_failures += 1;
201 } else if avg_read_latency > self.config.latency_warning_threshold_ms as f64
202 || avg_write_latency > self.config.latency_warning_threshold_ms as f64
203 {
204 status = PredictiveHealthStatus::Warning;
205 warnings.push(format!(
206 "High latency: read={:.2}ms, write={:.2}ms",
207 avg_read_latency, avg_write_latency
208 ));
209 } else {
210 self.consecutive_failures = 0;
211 }
212
213 if free_space_percent < self.config.min_free_space_percent {
215 status = PredictiveHealthStatus::Critical;
216 warnings.push(format!("Low disk space: {:.2}% free", free_space_percent));
217 } else if free_space_percent < self.config.min_free_space_percent * 2.0 {
218 if status == PredictiveHealthStatus::Healthy {
219 status = PredictiveHealthStatus::Warning;
220 }
221 warnings.push(format!(
222 "Disk space running low: {:.2}% free",
223 free_space_percent
224 ));
225 }
226
227 if self.consecutive_failures >= self.config.critical_failure_count {
229 status = PredictiveHealthStatus::Critical;
230 warnings.push(format!(
231 "Consecutive failures: {}",
232 self.consecutive_failures
233 ));
234 }
235
236 let failure_risk = if self.config.enable_prediction {
238 self.predict_failure_risk()
239 } else {
240 0.0
241 };
242
243 if failure_risk > self.config.failure_risk_threshold {
244 if status == PredictiveHealthStatus::Healthy {
245 status = PredictiveHealthStatus::Warning;
246 }
247 warnings.push(format!(
248 "High failure risk detected: {:.1}%",
249 failure_risk * 100.0
250 ));
251 }
252
253 Ok(PredictiveHealthReport {
254 overall_status: status,
255 timestamp: current_timestamp(),
256 avg_read_latency_ms: avg_read_latency,
257 avg_write_latency_ms: avg_write_latency,
258 recent_io_errors: self.io_error_count,
259 available_bytes,
260 total_bytes,
261 free_space_percent,
262 failure_risk_score: failure_risk,
263 warnings,
264 })
265 }
266
267 async fn measure_latency(&mut self) -> Result<(f64, f64), HealthMonitorError> {
269 let test_file = self.storage_path.join(".health_check");
270
271 let write_start = Instant::now();
273 let write_result = tokio::fs::write(&test_file, b"health_check").await;
274 let write_latency = write_start.elapsed().as_secs_f64() * 1000.0;
275
276 if write_result.is_err() {
277 self.io_error_count += 1;
278 }
279
280 let read_start = Instant::now();
282 let read_result = tokio::fs::read(&test_file).await;
283 let read_latency = read_start.elapsed().as_secs_f64() * 1000.0;
284
285 if read_result.is_err() {
286 self.io_error_count += 1;
287 }
288
289 let _ = tokio::fs::remove_file(&test_file).await;
291
292 Ok((read_latency, write_latency))
293 }
294
295 fn get_space_info(&self) -> Result<(u64, u64), HealthMonitorError> {
297 #[cfg(unix)]
298 {
299 let _metadata = std::fs::metadata(&self.storage_path)?;
300
301 use std::ffi::CString;
303 use std::os::raw::c_char;
304
305 #[cfg(target_os = "macos")]
308 #[repr(C)]
309 struct statvfs {
310 f_bsize: u64, f_frsize: u64, f_blocks: u64, f_bfree: u64, f_bavail: u64, f_files: u64, f_ffree: u64, f_favail: u64, f_fsid: u64, f_flag: u64, f_namemax: u64, }
322
323 #[cfg(not(target_os = "macos"))]
324 #[repr(C)]
325 struct statvfs {
326 f_bsize: libc::c_ulong,
327 f_frsize: libc::c_ulong,
328 f_blocks: u64,
329 f_bfree: u64,
330 f_bavail: u64,
331 f_files: u64,
332 f_ffree: u64,
333 f_favail: u64,
334 f_fsid: libc::c_ulong,
335 f_flag: libc::c_ulong,
336 f_namemax: libc::c_ulong,
337 _padding: [i32; 6],
338 }
339
340 unsafe extern "C" {
341 fn statvfs(path: *const c_char, buf: *mut statvfs) -> i32;
342 }
343
344 let path_cstr = CString::new(self.storage_path.to_str().unwrap_or_default())
345 .map_err(|e| HealthMonitorError::MetricsError(e.to_string()))?;
346
347 let mut stats: statvfs = unsafe { std::mem::zeroed() };
348 let result = unsafe { statvfs(path_cstr.as_ptr(), &mut stats) };
349
350 if result == 0 {
351 let block_size = stats.f_frsize;
352 let available = stats.f_bavail.saturating_mul(block_size);
354 let total = stats.f_blocks.saturating_mul(block_size);
355 Ok((available, total))
356 } else {
357 Ok((100_000_000_000, 1_000_000_000_000)) }
360 }
361
362 #[cfg(not(unix))]
363 {
364 Ok((100_000_000_000, 1_000_000_000_000)) }
367 }
368
369 fn calculate_avg_read_latency(&self) -> f64 {
371 if self.history.is_empty() {
372 return 0.0;
373 }
374
375 let sum: f64 = self.history.iter().map(|s| s.read_latency_ms).sum();
376 sum / self.history.len() as f64
377 }
378
379 fn calculate_avg_write_latency(&self) -> f64 {
381 if self.history.is_empty() {
382 return 0.0;
383 }
384
385 let sum: f64 = self.history.iter().map(|s| s.write_latency_ms).sum();
386 sum / self.history.len() as f64
387 }
388
389 fn predict_failure_risk(&self) -> f64 {
391 if self.history.len() < 10 {
392 return 0.0;
393 }
394
395 let mut risk_score: f64 = 0.0;
396
397 let recent_read_latency = self.recent_avg_read_latency(10);
399 let older_read_latency = self.older_avg_read_latency(10);
400
401 if older_read_latency > 0.0 {
402 let latency_increase = (recent_read_latency - older_read_latency) / older_read_latency;
403 if latency_increase > 0.5 {
404 risk_score += 0.3; } else if latency_increase > 0.2 {
406 risk_score += 0.15;
407 }
408 }
409
410 let error_rate = self.io_error_count as f64 / self.total_checks as f64;
412 if error_rate > 0.05 {
413 risk_score += 0.4; } else if error_rate > 0.01 {
415 risk_score += 0.2;
416 }
417
418 if let Some(space_depletion_days) = self.estimate_space_depletion_days() {
420 if space_depletion_days < 7.0 {
421 risk_score += 0.3;
422 } else if space_depletion_days < 30.0 {
423 risk_score += 0.15;
424 }
425 }
426
427 risk_score.min(1.0)
428 }
429
430 fn recent_avg_read_latency(&self, count: usize) -> f64 {
432 if self.history.len() < count {
433 return 0.0;
434 }
435
436 let recent: Vec<_> = self.history.iter().rev().take(count).collect();
437 let sum: f64 = recent.iter().map(|s| s.read_latency_ms).sum();
438 sum / count as f64
439 }
440
441 fn older_avg_read_latency(&self, count: usize) -> f64 {
443 if self.history.len() < count * 2 {
444 return 0.0;
445 }
446
447 let older: Vec<_> = self
448 .history
449 .iter()
450 .skip(self.history.len() - count * 2)
451 .take(count)
452 .collect();
453 let sum: f64 = older.iter().map(|s| s.read_latency_ms).sum();
454 sum / count as f64
455 }
456
457 fn estimate_space_depletion_days(&self) -> Option<f64> {
459 if self.history.len() < 20 {
460 return None;
461 }
462
463 let history_vec: Vec<_> = self.history.iter().collect();
464 let recent = &history_vec[history_vec.len() - 10..];
465 let older = &history_vec[history_vec.len() - 20..history_vec.len() - 10];
466
467 let recent_sum: u64 = recent
469 .iter()
470 .map(|s| s.free_space_bytes)
471 .fold(0u64, |acc, x| acc.saturating_add(x));
472 let recent_avg_free: u64 = recent_sum / recent.len() as u64;
473
474 let older_sum: u64 = older
475 .iter()
476 .map(|s| s.free_space_bytes)
477 .fold(0u64, |acc, x| acc.saturating_add(x));
478 let older_avg_free: u64 = older_sum / older.len() as u64;
479
480 if recent_avg_free >= older_avg_free {
481 return None; }
483
484 let space_lost = older_avg_free.saturating_sub(recent_avg_free);
485 let recent_last = recent[recent.len() - 1];
486 let older_first = older[0];
487 let time_span_hours =
488 (recent_last.timestamp.saturating_sub(older_first.timestamp)) as f64 / 3600.0;
489
490 if time_span_hours == 0.0 {
491 return None;
492 }
493
494 let depletion_rate_per_hour = space_lost as f64 / time_span_hours;
495 let hours_until_full = recent_avg_free as f64 / depletion_rate_per_hour;
496
497 Some(hours_until_full / 24.0) }
499
500 #[must_use]
502 #[inline]
503 pub fn sample_count(&self) -> usize {
504 self.history.len()
505 }
506
507 #[must_use]
509 pub const fn total_checks(&self) -> usize {
510 self.total_checks
511 }
512
513 #[must_use]
515 pub fn storage_path(&self) -> &PathBuf {
516 &self.storage_path
517 }
518}
519
520fn current_timestamp() -> u64 {
522 SystemTime::now()
523 .duration_since(UNIX_EPOCH)
524 .unwrap_or_default()
525 .as_secs()
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use tempfile::TempDir;
532
533 fn create_test_monitor() -> (PredictiveStorageMonitor, TempDir) {
534 let temp_dir = TempDir::new().unwrap();
535 let config = HealthConfig {
536 check_interval: Duration::from_millis(10),
537 latency_warning_threshold_ms: 50,
538 latency_critical_threshold_ms: 200,
539 min_free_space_percent: 10.0,
540 critical_failure_count: 3,
541 enable_prediction: true,
542 failure_risk_threshold: 0.7,
543 };
544 let monitor = PredictiveStorageMonitor::new(temp_dir.path().to_path_buf(), config);
545 (monitor, temp_dir)
546 }
547
548 #[tokio::test]
549 async fn test_health_check_basic() {
550 let (mut monitor, _temp_dir) = create_test_monitor();
551
552 let report = monitor.check_health().await.unwrap();
553 assert_eq!(report.overall_status, PredictiveHealthStatus::Healthy);
554 assert_eq!(monitor.sample_count(), 1);
555 }
556
557 #[tokio::test]
558 async fn test_multiple_health_checks() {
559 let (mut monitor, _temp_dir) = create_test_monitor();
560
561 for _ in 0..5 {
562 let report = monitor.check_health().await.unwrap();
563 assert!(matches!(
564 report.overall_status,
565 PredictiveHealthStatus::Healthy | PredictiveHealthStatus::Warning
566 ));
567 }
568
569 assert_eq!(monitor.sample_count(), 5);
570 assert_eq!(monitor.total_checks(), 5);
571 }
572
573 #[tokio::test]
574 async fn test_latency_measurement() {
575 let (mut monitor, _temp_dir) = create_test_monitor();
576
577 let report = monitor.check_health().await.unwrap();
578 assert!(report.avg_read_latency_ms >= 0.0);
579 assert!(report.avg_write_latency_ms >= 0.0);
580 }
581
582 #[test]
583 fn test_health_config_defaults() {
584 let config = HealthConfig::default();
585 assert_eq!(config.check_interval, Duration::from_secs(60));
586 assert_eq!(config.latency_warning_threshold_ms, 100);
587 assert_eq!(config.min_free_space_percent, 10.0);
588 assert!(config.enable_prediction);
589 }
590
591 #[tokio::test]
592 async fn test_history_limit() {
593 let (mut monitor, _temp_dir) = create_test_monitor();
594
595 for _ in 0..MAX_HISTORY_SAMPLES + 10 {
597 let _ = monitor.check_health().await;
598 }
599
600 assert_eq!(monitor.sample_count(), MAX_HISTORY_SAMPLES);
601 }
602
603 #[tokio::test]
604 async fn test_failure_risk_insufficient_data() {
605 let (monitor, _temp_dir) = create_test_monitor();
606
607 let risk = monitor.predict_failure_risk();
609 assert_eq!(risk, 0.0);
610 }
611
612 #[tokio::test]
613 async fn test_space_info() {
614 let (monitor, _temp_dir) = create_test_monitor();
615
616 let result = monitor.get_space_info();
617 assert!(result.is_ok());
618
619 let (available, total) = result.unwrap();
620 assert!(available > 0);
621 assert!(total > 0);
622 assert!(available <= total);
623 }
624
625 #[tokio::test]
626 async fn test_average_calculations() {
627 let (mut monitor, _temp_dir) = create_test_monitor();
628
629 for _ in 0..10 {
631 let _ = monitor.check_health().await;
632 }
633
634 let avg_read = monitor.calculate_avg_read_latency();
635 let avg_write = monitor.calculate_avg_write_latency();
636
637 assert!(avg_read >= 0.0);
638 assert!(avg_write >= 0.0);
639 }
640}