chie_core/
storage_health.rs

1//! Storage health monitoring with predictive failure detection.
2//!
3//! This module provides comprehensive monitoring of storage health metrics
4//! and predictive analysis to detect potential storage failures before they occur.
5//!
6//! # Example
7//!
8//! ```
9//! use chie_core::storage_health::{PredictiveStorageMonitor, HealthConfig};
10//! use std::path::PathBuf;
11//! use std::time::Duration;
12//!
13//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
14//! let config = HealthConfig {
15//!     check_interval: Duration::from_secs(60),
16//!     latency_warning_threshold_ms: 100,
17//!     latency_critical_threshold_ms: 500,
18//!     ..Default::default()
19//! };
20//!
21//! let mut monitor = PredictiveStorageMonitor::new(PathBuf::from("/storage"), config);
22//!
23//! // Perform health check
24//! let health = monitor.check_health().await?;
25//! println!("Storage health: {:?}", health.overall_status);
26//! println!("Failure risk: {:.2}%", health.failure_risk_score * 100.0);
27//! # Ok(())
28//! # }
29//! ```
30
31use std::collections::VecDeque;
32use std::path::PathBuf;
33use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
34use thiserror::Error;
35
36/// Maximum number of historical samples to keep for analysis.
37const MAX_HISTORY_SAMPLES: usize = 1000;
38
39/// Errors that can occur during health monitoring.
40#[derive(Debug, Error)]
41pub enum HealthMonitorError {
42    #[error("IO error during health check: {0}")]
43    IoError(#[from] std::io::Error),
44
45    #[error("Failed to read disk metrics: {0}")]
46    MetricsError(String),
47
48    #[error("Insufficient data for prediction (need at least {required} samples, have {actual})")]
49    InsufficientData { required: usize, actual: usize },
50}
51
52/// Configuration for storage health monitoring.
53#[derive(Debug, Clone)]
54pub struct HealthConfig {
55    /// Interval between health checks.
56    pub check_interval: Duration,
57    /// Warning threshold for read/write latency (ms).
58    pub latency_warning_threshold_ms: u64,
59    /// Critical threshold for read/write latency (ms).
60    pub latency_critical_threshold_ms: u64,
61    /// Minimum free space percentage before warning.
62    pub min_free_space_percent: f64,
63    /// Number of consecutive failures before marking critical.
64    pub critical_failure_count: usize,
65    /// Enable predictive failure detection.
66    pub enable_prediction: bool,
67    /// Failure risk threshold for warnings (0.0-1.0).
68    pub failure_risk_threshold: f64,
69}
70
71impl Default for HealthConfig {
72    fn default() -> Self {
73        Self {
74            check_interval: Duration::from_secs(60),
75            latency_warning_threshold_ms: 100,
76            latency_critical_threshold_ms: 500,
77            min_free_space_percent: 10.0,
78            critical_failure_count: 3,
79            enable_prediction: true,
80            failure_risk_threshold: 0.7,
81        }
82    }
83}
84
85/// Overall storage health status.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum PredictiveHealthStatus {
88    /// Storage is healthy.
89    Healthy,
90    /// Storage shows warning signs.
91    Warning,
92    /// Storage is in critical condition.
93    Critical,
94    /// Storage health is unknown.
95    Unknown,
96}
97
98/// Detailed storage health report.
99#[derive(Debug, Clone)]
100pub struct PredictiveHealthReport {
101    /// Overall health status.
102    pub overall_status: PredictiveHealthStatus,
103    /// Timestamp of this report.
104    pub timestamp: u64,
105    /// Average read latency (ms).
106    pub avg_read_latency_ms: f64,
107    /// Average write latency (ms).
108    pub avg_write_latency_ms: f64,
109    /// Recent IO error count.
110    pub recent_io_errors: usize,
111    /// Available space (bytes).
112    pub available_bytes: u64,
113    /// Total space (bytes).
114    pub total_bytes: u64,
115    /// Free space percentage.
116    pub free_space_percent: f64,
117    /// Predicted failure risk score (0.0-1.0).
118    pub failure_risk_score: f64,
119    /// Warning messages.
120    pub warnings: Vec<String>,
121}
122
123/// Historical storage metric sample.
124#[derive(Debug, Clone)]
125#[allow(dead_code)]
126struct MetricSample {
127    timestamp: u64,
128    read_latency_ms: f64,
129    write_latency_ms: f64,
130    io_errors: usize,
131    free_space_bytes: u64,
132}
133
134/// Storage health monitor with predictive capabilities.
135pub struct PredictiveStorageMonitor {
136    storage_path: PathBuf,
137    config: HealthConfig,
138    history: VecDeque<MetricSample>,
139    consecutive_failures: usize,
140    last_check: Option<Instant>,
141    io_error_count: usize,
142    total_checks: usize,
143}
144
145impl PredictiveStorageMonitor {
146    /// Create a new storage health monitor.
147    pub fn new(storage_path: PathBuf, config: HealthConfig) -> Self {
148        Self {
149            storage_path,
150            config,
151            history: VecDeque::with_capacity(MAX_HISTORY_SAMPLES),
152            consecutive_failures: 0,
153            last_check: None,
154            io_error_count: 0,
155            total_checks: 0,
156        }
157    }
158
159    /// Perform a health check and return the report.
160    pub async fn check_health(&mut self) -> Result<PredictiveHealthReport, HealthMonitorError> {
161        self.total_checks += 1;
162        self.last_check = Some(Instant::now());
163
164        // Collect current metrics
165        let (read_latency, write_latency) = self.measure_latency().await?;
166        let (available_bytes, total_bytes) = self.get_space_info()?;
167        let free_space_percent = (available_bytes as f64 / total_bytes as f64) * 100.0;
168
169        // Record sample
170        let sample = MetricSample {
171            timestamp: current_timestamp(),
172            read_latency_ms: read_latency,
173            write_latency_ms: write_latency,
174            io_errors: self.io_error_count,
175            free_space_bytes: available_bytes,
176        };
177
178        if self.history.len() >= MAX_HISTORY_SAMPLES {
179            self.history.pop_front();
180        }
181        self.history.push_back(sample);
182
183        // Calculate averages
184        let avg_read_latency = self.calculate_avg_read_latency();
185        let avg_write_latency = self.calculate_avg_write_latency();
186
187        // Determine health status
188        let mut warnings = Vec::new();
189        let mut status = PredictiveHealthStatus::Healthy;
190
191        // Check latency
192        if avg_read_latency > self.config.latency_critical_threshold_ms as f64
193            || avg_write_latency > self.config.latency_critical_threshold_ms as f64
194        {
195            status = PredictiveHealthStatus::Critical;
196            warnings.push(format!(
197                "Critical latency: read={:.2}ms, write={:.2}ms",
198                avg_read_latency, avg_write_latency
199            ));
200            self.consecutive_failures += 1;
201        } else if avg_read_latency > self.config.latency_warning_threshold_ms as f64
202            || avg_write_latency > self.config.latency_warning_threshold_ms as f64
203        {
204            status = PredictiveHealthStatus::Warning;
205            warnings.push(format!(
206                "High latency: read={:.2}ms, write={:.2}ms",
207                avg_read_latency, avg_write_latency
208            ));
209        } else {
210            self.consecutive_failures = 0;
211        }
212
213        // Check free space
214        if free_space_percent < self.config.min_free_space_percent {
215            status = PredictiveHealthStatus::Critical;
216            warnings.push(format!("Low disk space: {:.2}% free", free_space_percent));
217        } else if free_space_percent < self.config.min_free_space_percent * 2.0 {
218            if status == PredictiveHealthStatus::Healthy {
219                status = PredictiveHealthStatus::Warning;
220            }
221            warnings.push(format!(
222                "Disk space running low: {:.2}% free",
223                free_space_percent
224            ));
225        }
226
227        // Check consecutive failures
228        if self.consecutive_failures >= self.config.critical_failure_count {
229            status = PredictiveHealthStatus::Critical;
230            warnings.push(format!(
231                "Consecutive failures: {}",
232                self.consecutive_failures
233            ));
234        }
235
236        // Calculate failure risk
237        let failure_risk = if self.config.enable_prediction {
238            self.predict_failure_risk()
239        } else {
240            0.0
241        };
242
243        if failure_risk > self.config.failure_risk_threshold {
244            if status == PredictiveHealthStatus::Healthy {
245                status = PredictiveHealthStatus::Warning;
246            }
247            warnings.push(format!(
248                "High failure risk detected: {:.1}%",
249                failure_risk * 100.0
250            ));
251        }
252
253        Ok(PredictiveHealthReport {
254            overall_status: status,
255            timestamp: current_timestamp(),
256            avg_read_latency_ms: avg_read_latency,
257            avg_write_latency_ms: avg_write_latency,
258            recent_io_errors: self.io_error_count,
259            available_bytes,
260            total_bytes,
261            free_space_percent,
262            failure_risk_score: failure_risk,
263            warnings,
264        })
265    }
266
267    /// Measure current read/write latency by performing test operations.
268    async fn measure_latency(&mut self) -> Result<(f64, f64), HealthMonitorError> {
269        let test_file = self.storage_path.join(".health_check");
270
271        // Measure write latency
272        let write_start = Instant::now();
273        let write_result = tokio::fs::write(&test_file, b"health_check").await;
274        let write_latency = write_start.elapsed().as_secs_f64() * 1000.0;
275
276        if write_result.is_err() {
277            self.io_error_count += 1;
278        }
279
280        // Measure read latency
281        let read_start = Instant::now();
282        let read_result = tokio::fs::read(&test_file).await;
283        let read_latency = read_start.elapsed().as_secs_f64() * 1000.0;
284
285        if read_result.is_err() {
286            self.io_error_count += 1;
287        }
288
289        // Clean up
290        let _ = tokio::fs::remove_file(&test_file).await;
291
292        Ok((read_latency, write_latency))
293    }
294
295    /// Get storage space information.
296    fn get_space_info(&self) -> Result<(u64, u64), HealthMonitorError> {
297        #[cfg(unix)]
298        {
299            let _metadata = std::fs::metadata(&self.storage_path)?;
300
301            // Get filesystem stats using statvfs
302            use std::ffi::CString;
303            use std::os::raw::c_char;
304
305            // Use libc types for proper cross-platform compatibility
306            // The statvfs struct layout varies significantly between platforms (Linux vs macOS)
307            #[cfg(target_os = "macos")]
308            #[repr(C)]
309            struct statvfs {
310                f_bsize: u64,   // fundamental file system block size
311                f_frsize: u64,  // fragment size
312                f_blocks: u64,  // total blocks
313                f_bfree: u64,   // free blocks
314                f_bavail: u64,  // free blocks available to non-superuser
315                f_files: u64,   // total file nodes
316                f_ffree: u64,   // free file nodes
317                f_favail: u64,  // free file nodes available to non-superuser
318                f_fsid: u64,    // file system id
319                f_flag: u64,    // mount flags
320                f_namemax: u64, // maximum filename length
321            }
322
323            #[cfg(not(target_os = "macos"))]
324            #[repr(C)]
325            struct statvfs {
326                f_bsize: libc::c_ulong,
327                f_frsize: libc::c_ulong,
328                f_blocks: u64,
329                f_bfree: u64,
330                f_bavail: u64,
331                f_files: u64,
332                f_ffree: u64,
333                f_favail: u64,
334                f_fsid: libc::c_ulong,
335                f_flag: libc::c_ulong,
336                f_namemax: libc::c_ulong,
337                _padding: [i32; 6],
338            }
339
340            unsafe extern "C" {
341                fn statvfs(path: *const c_char, buf: *mut statvfs) -> i32;
342            }
343
344            let path_cstr = CString::new(self.storage_path.to_str().unwrap_or_default())
345                .map_err(|e| HealthMonitorError::MetricsError(e.to_string()))?;
346
347            let mut stats: statvfs = unsafe { std::mem::zeroed() };
348            let result = unsafe { statvfs(path_cstr.as_ptr(), &mut stats) };
349
350            if result == 0 {
351                let block_size = stats.f_frsize;
352                // Use saturating_mul to prevent overflow
353                let available = stats.f_bavail.saturating_mul(block_size);
354                let total = stats.f_blocks.saturating_mul(block_size);
355                Ok((available, total))
356            } else {
357                // Fallback to simple estimate
358                Ok((100_000_000_000, 1_000_000_000_000)) // 100GB available, 1TB total
359            }
360        }
361
362        #[cfg(not(unix))]
363        {
364            // Simplified implementation for non-Unix systems
365            Ok((100_000_000_000, 1_000_000_000_000)) // 100GB available, 1TB total
366        }
367    }
368
369    /// Calculate average read latency from history.
370    fn calculate_avg_read_latency(&self) -> f64 {
371        if self.history.is_empty() {
372            return 0.0;
373        }
374
375        let sum: f64 = self.history.iter().map(|s| s.read_latency_ms).sum();
376        sum / self.history.len() as f64
377    }
378
379    /// Calculate average write latency from history.
380    fn calculate_avg_write_latency(&self) -> f64 {
381        if self.history.is_empty() {
382            return 0.0;
383        }
384
385        let sum: f64 = self.history.iter().map(|s| s.write_latency_ms).sum();
386        sum / self.history.len() as f64
387    }
388
389    /// Predict failure risk based on historical trends.
390    fn predict_failure_risk(&self) -> f64 {
391        if self.history.len() < 10 {
392            return 0.0;
393        }
394
395        let mut risk_score: f64 = 0.0;
396
397        // Analyze latency trends
398        let recent_read_latency = self.recent_avg_read_latency(10);
399        let older_read_latency = self.older_avg_read_latency(10);
400
401        if older_read_latency > 0.0 {
402            let latency_increase = (recent_read_latency - older_read_latency) / older_read_latency;
403            if latency_increase > 0.5 {
404                risk_score += 0.3; // 30% risk for rapidly increasing latency
405            } else if latency_increase > 0.2 {
406                risk_score += 0.15;
407            }
408        }
409
410        // Analyze error rate trends
411        let error_rate = self.io_error_count as f64 / self.total_checks as f64;
412        if error_rate > 0.05 {
413            risk_score += 0.4; // 40% risk for high error rate
414        } else if error_rate > 0.01 {
415            risk_score += 0.2;
416        }
417
418        // Analyze space depletion rate
419        if let Some(space_depletion_days) = self.estimate_space_depletion_days() {
420            if space_depletion_days < 7.0 {
421                risk_score += 0.3;
422            } else if space_depletion_days < 30.0 {
423                risk_score += 0.15;
424            }
425        }
426
427        risk_score.min(1.0)
428    }
429
430    /// Calculate average read latency for recent samples.
431    fn recent_avg_read_latency(&self, count: usize) -> f64 {
432        if self.history.len() < count {
433            return 0.0;
434        }
435
436        let recent: Vec<_> = self.history.iter().rev().take(count).collect();
437        let sum: f64 = recent.iter().map(|s| s.read_latency_ms).sum();
438        sum / count as f64
439    }
440
441    /// Calculate average read latency for older samples.
442    fn older_avg_read_latency(&self, count: usize) -> f64 {
443        if self.history.len() < count * 2 {
444            return 0.0;
445        }
446
447        let older: Vec<_> = self
448            .history
449            .iter()
450            .skip(self.history.len() - count * 2)
451            .take(count)
452            .collect();
453        let sum: f64 = older.iter().map(|s| s.read_latency_ms).sum();
454        sum / count as f64
455    }
456
457    /// Estimate days until storage is full based on current depletion rate.
458    fn estimate_space_depletion_days(&self) -> Option<f64> {
459        if self.history.len() < 20 {
460            return None;
461        }
462
463        let history_vec: Vec<_> = self.history.iter().collect();
464        let recent = &history_vec[history_vec.len() - 10..];
465        let older = &history_vec[history_vec.len() - 20..history_vec.len() - 10];
466
467        // Use saturating_add to prevent overflow when summing large disk space values
468        let recent_sum: u64 = recent
469            .iter()
470            .map(|s| s.free_space_bytes)
471            .fold(0u64, |acc, x| acc.saturating_add(x));
472        let recent_avg_free: u64 = recent_sum / recent.len() as u64;
473
474        let older_sum: u64 = older
475            .iter()
476            .map(|s| s.free_space_bytes)
477            .fold(0u64, |acc, x| acc.saturating_add(x));
478        let older_avg_free: u64 = older_sum / older.len() as u64;
479
480        if recent_avg_free >= older_avg_free {
481            return None; // Not depleting
482        }
483
484        let space_lost = older_avg_free.saturating_sub(recent_avg_free);
485        let recent_last = recent[recent.len() - 1];
486        let older_first = older[0];
487        let time_span_hours =
488            (recent_last.timestamp.saturating_sub(older_first.timestamp)) as f64 / 3600.0;
489
490        if time_span_hours == 0.0 {
491            return None;
492        }
493
494        let depletion_rate_per_hour = space_lost as f64 / time_span_hours;
495        let hours_until_full = recent_avg_free as f64 / depletion_rate_per_hour;
496
497        Some(hours_until_full / 24.0) // Convert to days
498    }
499
500    /// Get the number of historical samples collected.
501    #[must_use]
502    #[inline]
503    pub fn sample_count(&self) -> usize {
504        self.history.len()
505    }
506
507    /// Get the total number of checks performed.
508    #[must_use]
509    pub const fn total_checks(&self) -> usize {
510        self.total_checks
511    }
512
513    /// Get the storage path being monitored.
514    #[must_use]
515    pub fn storage_path(&self) -> &PathBuf {
516        &self.storage_path
517    }
518}
519
520/// Get current Unix timestamp.
521fn current_timestamp() -> u64 {
522    SystemTime::now()
523        .duration_since(UNIX_EPOCH)
524        .unwrap_or_default()
525        .as_secs()
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use tempfile::TempDir;
532
533    fn create_test_monitor() -> (PredictiveStorageMonitor, TempDir) {
534        let temp_dir = TempDir::new().unwrap();
535        let config = HealthConfig {
536            check_interval: Duration::from_millis(10),
537            latency_warning_threshold_ms: 50,
538            latency_critical_threshold_ms: 200,
539            min_free_space_percent: 10.0,
540            critical_failure_count: 3,
541            enable_prediction: true,
542            failure_risk_threshold: 0.7,
543        };
544        let monitor = PredictiveStorageMonitor::new(temp_dir.path().to_path_buf(), config);
545        (monitor, temp_dir)
546    }
547
548    #[tokio::test]
549    async fn test_health_check_basic() {
550        let (mut monitor, _temp_dir) = create_test_monitor();
551
552        let report = monitor.check_health().await.unwrap();
553        assert_eq!(report.overall_status, PredictiveHealthStatus::Healthy);
554        assert_eq!(monitor.sample_count(), 1);
555    }
556
557    #[tokio::test]
558    async fn test_multiple_health_checks() {
559        let (mut monitor, _temp_dir) = create_test_monitor();
560
561        for _ in 0..5 {
562            let report = monitor.check_health().await.unwrap();
563            assert!(matches!(
564                report.overall_status,
565                PredictiveHealthStatus::Healthy | PredictiveHealthStatus::Warning
566            ));
567        }
568
569        assert_eq!(monitor.sample_count(), 5);
570        assert_eq!(monitor.total_checks(), 5);
571    }
572
573    #[tokio::test]
574    async fn test_latency_measurement() {
575        let (mut monitor, _temp_dir) = create_test_monitor();
576
577        let report = monitor.check_health().await.unwrap();
578        assert!(report.avg_read_latency_ms >= 0.0);
579        assert!(report.avg_write_latency_ms >= 0.0);
580    }
581
582    #[test]
583    fn test_health_config_defaults() {
584        let config = HealthConfig::default();
585        assert_eq!(config.check_interval, Duration::from_secs(60));
586        assert_eq!(config.latency_warning_threshold_ms, 100);
587        assert_eq!(config.min_free_space_percent, 10.0);
588        assert!(config.enable_prediction);
589    }
590
591    #[tokio::test]
592    async fn test_history_limit() {
593        let (mut monitor, _temp_dir) = create_test_monitor();
594
595        // Add more samples than the limit
596        for _ in 0..MAX_HISTORY_SAMPLES + 10 {
597            let _ = monitor.check_health().await;
598        }
599
600        assert_eq!(monitor.sample_count(), MAX_HISTORY_SAMPLES);
601    }
602
603    #[tokio::test]
604    async fn test_failure_risk_insufficient_data() {
605        let (monitor, _temp_dir) = create_test_monitor();
606
607        // With no history, risk should be 0
608        let risk = monitor.predict_failure_risk();
609        assert_eq!(risk, 0.0);
610    }
611
612    #[tokio::test]
613    async fn test_space_info() {
614        let (monitor, _temp_dir) = create_test_monitor();
615
616        let result = monitor.get_space_info();
617        assert!(result.is_ok());
618
619        let (available, total) = result.unwrap();
620        assert!(available > 0);
621        assert!(total > 0);
622        assert!(available <= total);
623    }
624
625    #[tokio::test]
626    async fn test_average_calculations() {
627        let (mut monitor, _temp_dir) = create_test_monitor();
628
629        // Perform multiple checks to build history
630        for _ in 0..10 {
631            let _ = monitor.check_health().await;
632        }
633
634        let avg_read = monitor.calculate_avg_read_latency();
635        let avg_write = monitor.calculate_avg_write_latency();
636
637        assert!(avg_read >= 0.0);
638        assert!(avg_write >= 0.0);
639    }
640}