ipfrs_storage/
diagnostics.rs

1//! Storage diagnostics and health monitoring utilities
2//!
3//! Provides comprehensive tools for analyzing storage performance,
4//! health, and identifying potential issues.
5
6use crate::traits::BlockStore;
7use ipfrs_core::{Block, Cid, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{Duration, Instant};
11use sysinfo::{ProcessesToUpdate, System};
12
13/// Comprehensive storage diagnostics report
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct DiagnosticsReport {
16    /// Storage backend name
17    pub backend: String,
18    /// Total blocks tested
19    pub total_blocks: usize,
20    /// Performance metrics
21    pub performance: PerformanceMetrics,
22    /// Health check results
23    pub health: HealthMetrics,
24    /// Recommendations for optimization
25    pub recommendations: Vec<String>,
26    /// Overall health score (0-100)
27    pub health_score: u8,
28}
29
30/// Performance metrics for storage operations
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct PerformanceMetrics {
33    /// Average write latency
34    pub avg_write_latency: Duration,
35    /// Average read latency
36    pub avg_read_latency: Duration,
37    /// Average batch write latency
38    pub avg_batch_write_latency: Duration,
39    /// Average batch read latency
40    pub avg_batch_read_latency: Duration,
41    /// Write throughput (blocks/sec)
42    pub write_throughput: f64,
43    /// Read throughput (blocks/sec)
44    pub read_throughput: f64,
45    /// Peak memory usage (bytes)
46    pub peak_memory_usage: usize,
47}
48
49/// Health metrics for storage backend
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct HealthMetrics {
52    /// Number of successful operations
53    pub successful_ops: usize,
54    /// Number of failed operations
55    pub failed_ops: usize,
56    /// Success rate (0.0 - 1.0)
57    pub success_rate: f64,
58    /// Data integrity check passed
59    pub integrity_ok: bool,
60    /// Storage is responsive
61    pub responsive: bool,
62}
63
64/// Memory usage tracker for diagnostics
65struct MemoryTracker {
66    system: System,
67    pid: sysinfo::Pid,
68    peak_memory: usize,
69}
70
71impl MemoryTracker {
72    /// Create a new memory tracker
73    fn new() -> Self {
74        let mut system = System::new();
75        system.refresh_processes(ProcessesToUpdate::All, true);
76        let pid = sysinfo::get_current_pid().unwrap();
77
78        Self {
79            system,
80            pid,
81            peak_memory: 0,
82        }
83    }
84
85    /// Update peak memory usage
86    fn update(&mut self) {
87        self.system.refresh_processes(ProcessesToUpdate::All, true);
88        if let Some(process) = self.system.process(self.pid) {
89            let current_memory = process.memory() as usize;
90            if current_memory > self.peak_memory {
91                self.peak_memory = current_memory;
92            }
93        }
94    }
95
96    /// Get peak memory usage in bytes
97    fn peak_memory_bytes(&self) -> usize {
98        self.peak_memory
99    }
100}
101
102/// Storage diagnostics runner
103pub struct StorageDiagnostics<S: BlockStore> {
104    store: S,
105    backend_name: String,
106}
107
108impl<S: BlockStore> StorageDiagnostics<S> {
109    /// Create a new diagnostics runner
110    pub fn new(store: S, backend_name: String) -> Self {
111        Self {
112            store,
113            backend_name,
114        }
115    }
116
117    /// Run comprehensive diagnostics
118    ///
119    /// Tests include:
120    /// - Write/read latency measurements
121    /// - Batch operation performance
122    /// - Data integrity verification
123    /// - Storage responsiveness
124    /// - Memory usage tracking
125    pub async fn run(&mut self) -> Result<DiagnosticsReport> {
126        let mut successful_ops = 0;
127        let mut failed_ops = 0;
128
129        // Initialize memory tracker
130        let mut memory_tracker = MemoryTracker::new();
131        memory_tracker.update();
132
133        // Test data
134        let test_blocks = self.generate_test_data()?;
135        memory_tracker.update();
136
137        // Measure write performance
138        let write_start = Instant::now();
139        for block in &test_blocks {
140            match self.store.put(block).await {
141                Ok(_) => successful_ops += 1,
142                Err(_) => failed_ops += 1,
143            }
144        }
145        let write_duration = write_start.elapsed();
146        let avg_write_latency = write_duration / test_blocks.len() as u32;
147        memory_tracker.update();
148
149        // Measure read performance
150        let read_start = Instant::now();
151        let mut integrity_ok = true;
152        for block in &test_blocks {
153            match self.store.get(block.cid()).await {
154                Ok(Some(retrieved)) => {
155                    if retrieved.data() != block.data() {
156                        integrity_ok = false;
157                    }
158                    successful_ops += 1;
159                }
160                Ok(None) => {
161                    integrity_ok = false;
162                    failed_ops += 1;
163                }
164                Err(_) => failed_ops += 1,
165            }
166        }
167        let read_duration = read_start.elapsed();
168        let avg_read_latency = read_duration / test_blocks.len() as u32;
169        memory_tracker.update();
170
171        // Measure batch write performance
172        let batch_write_start = Instant::now();
173        let batch_result = self.store.put_many(&test_blocks).await;
174        let avg_batch_write_latency = batch_write_start.elapsed();
175        if batch_result.is_ok() {
176            successful_ops += test_blocks.len();
177        } else {
178            failed_ops += test_blocks.len();
179        }
180        memory_tracker.update();
181
182        // Measure batch read performance
183        let cids: Vec<Cid> = test_blocks.iter().map(|b| *b.cid()).collect();
184        let batch_read_start = Instant::now();
185        let _batch_read_result = self.store.get_many(&cids).await;
186        let avg_batch_read_latency = batch_read_start.elapsed();
187        memory_tracker.update();
188
189        // Calculate throughput
190        let write_throughput = test_blocks.len() as f64 / write_duration.as_secs_f64();
191        let read_throughput = test_blocks.len() as f64 / read_duration.as_secs_f64();
192
193        // Calculate success rate
194        let total_ops = successful_ops + failed_ops;
195        let success_rate = if total_ops > 0 {
196            successful_ops as f64 / total_ops as f64
197        } else {
198            0.0
199        };
200
201        // Check responsiveness
202        let responsive = avg_write_latency < Duration::from_secs(1)
203            && avg_read_latency < Duration::from_millis(500);
204
205        // Generate recommendations
206        let recommendations = self.generate_recommendations(
207            &avg_write_latency,
208            &avg_read_latency,
209            write_throughput,
210            read_throughput,
211            integrity_ok,
212            responsive,
213        );
214
215        // Calculate health score
216        let health_score = self.calculate_health_score(
217            success_rate,
218            integrity_ok,
219            responsive,
220            write_throughput,
221            read_throughput,
222        );
223
224        // Get peak memory usage
225        let peak_memory_usage = memory_tracker.peak_memory_bytes();
226
227        Ok(DiagnosticsReport {
228            backend: self.backend_name.clone(),
229            total_blocks: test_blocks.len(),
230            performance: PerformanceMetrics {
231                avg_write_latency,
232                avg_read_latency,
233                avg_batch_write_latency,
234                avg_batch_read_latency,
235                write_throughput,
236                read_throughput,
237                peak_memory_usage,
238            },
239            health: HealthMetrics {
240                successful_ops,
241                failed_ops,
242                success_rate,
243                integrity_ok,
244                responsive,
245            },
246            recommendations,
247            health_score,
248        })
249    }
250
251    /// Run quick health check (minimal overhead)
252    pub async fn quick_health_check(&mut self) -> Result<bool> {
253        // Test with a single small block
254        let test_data = vec![0u8; 1024];
255        let cid = crate::utils::compute_cid(&test_data);
256        let block = Block::from_parts(cid, test_data.into());
257
258        // Try write
259        self.store.put(&block).await?;
260
261        // Try read
262        let retrieved = self.store.get(&cid).await?;
263
264        // Verify
265        Ok(retrieved.is_some() && retrieved.unwrap().cid() == &cid)
266    }
267
268    /// Generate test data for diagnostics
269    fn generate_test_data(&self) -> Result<Vec<Block>> {
270        crate::utils::generate_mixed_size_blocks(5, 3, 2)
271    }
272
273    /// Generate recommendations based on metrics
274    #[allow(clippy::too_many_arguments)]
275    fn generate_recommendations(
276        &self,
277        avg_write_latency: &Duration,
278        avg_read_latency: &Duration,
279        write_throughput: f64,
280        read_throughput: f64,
281        integrity_ok: bool,
282        responsive: bool,
283    ) -> Vec<String> {
284        let mut recommendations = Vec::new();
285
286        if *avg_write_latency > Duration::from_millis(100) {
287            recommendations.push(
288                "High write latency detected. Consider enabling write coalescing or batch operations.".to_string()
289            );
290        }
291
292        if *avg_read_latency > Duration::from_millis(50) {
293            recommendations.push(
294                "High read latency detected. Consider enabling caching or bloom filters."
295                    .to_string(),
296            );
297        }
298
299        if write_throughput < 100.0 {
300            recommendations.push(
301                "Low write throughput. Consider using ParityDB backend or enabling compression."
302                    .to_string(),
303            );
304        }
305
306        if read_throughput < 200.0 {
307            recommendations.push(
308                "Low read throughput. Consider increasing cache size or using tiered caching."
309                    .to_string(),
310            );
311        }
312
313        if !integrity_ok {
314            recommendations.push(
315                "Data integrity issues detected! This is critical and should be investigated immediately.".to_string()
316            );
317        }
318
319        if !responsive {
320            recommendations.push(
321                "Storage backend is not responsive. Check system resources and backend configuration.".to_string()
322            );
323        }
324
325        if recommendations.is_empty() {
326            recommendations.push("Storage is performing well. No issues detected.".to_string());
327        }
328
329        recommendations
330    }
331
332    /// Calculate overall health score (0-100)
333    fn calculate_health_score(
334        &self,
335        success_rate: f64,
336        integrity_ok: bool,
337        responsive: bool,
338        write_throughput: f64,
339        read_throughput: f64,
340    ) -> u8 {
341        let mut score = 0u32;
342
343        // Success rate (40 points)
344        score += (success_rate * 40.0) as u32;
345
346        // Integrity (30 points)
347        if integrity_ok {
348            score += 30;
349        }
350
351        // Responsiveness (15 points)
352        if responsive {
353            score += 15;
354        }
355
356        // Write throughput (7.5 points)
357        if write_throughput >= 100.0 {
358            score += 7;
359        } else {
360            score += (write_throughput / 100.0 * 7.0) as u32;
361        }
362
363        // Read throughput (7.5 points)
364        if read_throughput >= 200.0 {
365            score += 8;
366        } else {
367            score += (read_throughput / 200.0 * 8.0) as u32;
368        }
369
370        score.min(100) as u8
371    }
372}
373
374/// Benchmark comparison between different storage backends
375pub struct BenchmarkComparison {
376    results: HashMap<String, DiagnosticsReport>,
377}
378
379impl BenchmarkComparison {
380    /// Create a new benchmark comparison
381    pub fn new() -> Self {
382        Self {
383            results: HashMap::new(),
384        }
385    }
386
387    /// Add a benchmark result
388    pub fn add_result(&mut self, name: String, report: DiagnosticsReport) {
389        self.results.insert(name, report);
390    }
391
392    /// Get the fastest backend for writes
393    pub fn fastest_write_backend(&self) -> Option<(&str, &DiagnosticsReport)> {
394        self.results
395            .iter()
396            .min_by_key(|(_, r)| r.performance.avg_write_latency)
397            .map(|(name, report)| (name.as_str(), report))
398    }
399
400    /// Get the fastest backend for reads
401    pub fn fastest_read_backend(&self) -> Option<(&str, &DiagnosticsReport)> {
402        self.results
403            .iter()
404            .min_by_key(|(_, r)| r.performance.avg_read_latency)
405            .map(|(name, report)| (name.as_str(), report))
406    }
407
408    /// Get the healthiest backend
409    pub fn healthiest_backend(&self) -> Option<(&str, &DiagnosticsReport)> {
410        self.results
411            .iter()
412            .max_by_key(|(_, r)| r.health_score)
413            .map(|(name, report)| (name.as_str(), report))
414    }
415
416    /// Generate a comparison summary
417    pub fn summary(&self) -> String {
418        let mut summary = String::from("=== Storage Backend Comparison ===\n\n");
419
420        for (name, report) in &self.results {
421            summary.push_str(&format!(
422                "{}: Health Score = {}/100\n",
423                name, report.health_score
424            ));
425            summary.push_str(&format!(
426                "  Write Latency: {:?}, Read Latency: {:?}\n",
427                report.performance.avg_write_latency, report.performance.avg_read_latency
428            ));
429            summary.push_str(&format!(
430                "  Write Throughput: {:.2} blocks/s, Read Throughput: {:.2} blocks/s\n\n",
431                report.performance.write_throughput, report.performance.read_throughput
432            ));
433        }
434
435        if let Some((name, _)) = self.fastest_write_backend() {
436            summary.push_str(&format!("Fastest for writes: {name}\n"));
437        }
438
439        if let Some((name, _)) = self.fastest_read_backend() {
440            summary.push_str(&format!("Fastest for reads: {name}\n"));
441        }
442
443        if let Some((name, _)) = self.healthiest_backend() {
444            summary.push_str(&format!("Healthiest overall: {name}\n"));
445        }
446
447        summary
448    }
449}
450
451impl Default for BenchmarkComparison {
452    fn default() -> Self {
453        Self::new()
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::MemoryBlockStore;
461
462    #[tokio::test]
463    async fn test_diagnostics_run() {
464        let store = MemoryBlockStore::new();
465        let mut diagnostics = StorageDiagnostics::new(store, "MemoryStore".to_string());
466
467        let report = diagnostics.run().await.unwrap();
468        assert_eq!(report.backend, "MemoryStore");
469        assert!(report.health_score > 0);
470        assert!(report.health.integrity_ok);
471    }
472
473    #[tokio::test]
474    async fn test_quick_health_check() {
475        let store = MemoryBlockStore::new();
476        let mut diagnostics = StorageDiagnostics::new(store, "MemoryStore".to_string());
477
478        let healthy = diagnostics.quick_health_check().await.unwrap();
479        assert!(healthy);
480    }
481
482    #[tokio::test]
483    async fn test_benchmark_comparison() {
484        let mut comparison = BenchmarkComparison::new();
485
486        let store1 = MemoryBlockStore::new();
487        let mut diag1 = StorageDiagnostics::new(store1, "Memory1".to_string());
488        let report1 = diag1.run().await.unwrap();
489        comparison.add_result("Memory1".to_string(), report1);
490
491        let store2 = MemoryBlockStore::new();
492        let mut diag2 = StorageDiagnostics::new(store2, "Memory2".to_string());
493        let report2 = diag2.run().await.unwrap();
494        comparison.add_result("Memory2".to_string(), report2);
495
496        assert!(comparison.fastest_write_backend().is_some());
497        assert!(comparison.fastest_read_backend().is_some());
498        assert!(comparison.healthiest_backend().is_some());
499
500        let summary = comparison.summary();
501        assert!(summary.contains("Storage Backend Comparison"));
502    }
503}