1use crate::traits::BlockStore;
7use ipfrs_core::{Block, Cid, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{Duration, Instant};
11use sysinfo::{ProcessesToUpdate, System};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct DiagnosticsReport {
16 pub backend: String,
18 pub total_blocks: usize,
20 pub performance: PerformanceMetrics,
22 pub health: HealthMetrics,
24 pub recommendations: Vec<String>,
26 pub health_score: u8,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct PerformanceMetrics {
33 pub avg_write_latency: Duration,
35 pub avg_read_latency: Duration,
37 pub avg_batch_write_latency: Duration,
39 pub avg_batch_read_latency: Duration,
41 pub write_throughput: f64,
43 pub read_throughput: f64,
45 pub peak_memory_usage: usize,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct HealthMetrics {
52 pub successful_ops: usize,
54 pub failed_ops: usize,
56 pub success_rate: f64,
58 pub integrity_ok: bool,
60 pub responsive: bool,
62}
63
64struct MemoryTracker {
66 system: System,
67 pid: sysinfo::Pid,
68 peak_memory: usize,
69}
70
71impl MemoryTracker {
72 fn new() -> Self {
74 let mut system = System::new();
75 system.refresh_processes(ProcessesToUpdate::All, true);
76 let pid = sysinfo::get_current_pid().unwrap();
77
78 Self {
79 system,
80 pid,
81 peak_memory: 0,
82 }
83 }
84
85 fn update(&mut self) {
87 self.system.refresh_processes(ProcessesToUpdate::All, true);
88 if let Some(process) = self.system.process(self.pid) {
89 let current_memory = process.memory() as usize;
90 if current_memory > self.peak_memory {
91 self.peak_memory = current_memory;
92 }
93 }
94 }
95
96 fn peak_memory_bytes(&self) -> usize {
98 self.peak_memory
99 }
100}
101
102pub struct StorageDiagnostics<S: BlockStore> {
104 store: S,
105 backend_name: String,
106}
107
108impl<S: BlockStore> StorageDiagnostics<S> {
109 pub fn new(store: S, backend_name: String) -> Self {
111 Self {
112 store,
113 backend_name,
114 }
115 }
116
117 pub async fn run(&mut self) -> Result<DiagnosticsReport> {
126 let mut successful_ops = 0;
127 let mut failed_ops = 0;
128
129 let mut memory_tracker = MemoryTracker::new();
131 memory_tracker.update();
132
133 let test_blocks = self.generate_test_data()?;
135 memory_tracker.update();
136
137 let write_start = Instant::now();
139 for block in &test_blocks {
140 match self.store.put(block).await {
141 Ok(_) => successful_ops += 1,
142 Err(_) => failed_ops += 1,
143 }
144 }
145 let write_duration = write_start.elapsed();
146 let avg_write_latency = write_duration / test_blocks.len() as u32;
147 memory_tracker.update();
148
149 let read_start = Instant::now();
151 let mut integrity_ok = true;
152 for block in &test_blocks {
153 match self.store.get(block.cid()).await {
154 Ok(Some(retrieved)) => {
155 if retrieved.data() != block.data() {
156 integrity_ok = false;
157 }
158 successful_ops += 1;
159 }
160 Ok(None) => {
161 integrity_ok = false;
162 failed_ops += 1;
163 }
164 Err(_) => failed_ops += 1,
165 }
166 }
167 let read_duration = read_start.elapsed();
168 let avg_read_latency = read_duration / test_blocks.len() as u32;
169 memory_tracker.update();
170
171 let batch_write_start = Instant::now();
173 let batch_result = self.store.put_many(&test_blocks).await;
174 let avg_batch_write_latency = batch_write_start.elapsed();
175 if batch_result.is_ok() {
176 successful_ops += test_blocks.len();
177 } else {
178 failed_ops += test_blocks.len();
179 }
180 memory_tracker.update();
181
182 let cids: Vec<Cid> = test_blocks.iter().map(|b| *b.cid()).collect();
184 let batch_read_start = Instant::now();
185 let _batch_read_result = self.store.get_many(&cids).await;
186 let avg_batch_read_latency = batch_read_start.elapsed();
187 memory_tracker.update();
188
189 let write_throughput = test_blocks.len() as f64 / write_duration.as_secs_f64();
191 let read_throughput = test_blocks.len() as f64 / read_duration.as_secs_f64();
192
193 let total_ops = successful_ops + failed_ops;
195 let success_rate = if total_ops > 0 {
196 successful_ops as f64 / total_ops as f64
197 } else {
198 0.0
199 };
200
201 let responsive = avg_write_latency < Duration::from_secs(1)
203 && avg_read_latency < Duration::from_millis(500);
204
205 let recommendations = self.generate_recommendations(
207 &avg_write_latency,
208 &avg_read_latency,
209 write_throughput,
210 read_throughput,
211 integrity_ok,
212 responsive,
213 );
214
215 let health_score = self.calculate_health_score(
217 success_rate,
218 integrity_ok,
219 responsive,
220 write_throughput,
221 read_throughput,
222 );
223
224 let peak_memory_usage = memory_tracker.peak_memory_bytes();
226
227 Ok(DiagnosticsReport {
228 backend: self.backend_name.clone(),
229 total_blocks: test_blocks.len(),
230 performance: PerformanceMetrics {
231 avg_write_latency,
232 avg_read_latency,
233 avg_batch_write_latency,
234 avg_batch_read_latency,
235 write_throughput,
236 read_throughput,
237 peak_memory_usage,
238 },
239 health: HealthMetrics {
240 successful_ops,
241 failed_ops,
242 success_rate,
243 integrity_ok,
244 responsive,
245 },
246 recommendations,
247 health_score,
248 })
249 }
250
251 pub async fn quick_health_check(&mut self) -> Result<bool> {
253 let test_data = vec![0u8; 1024];
255 let cid = crate::utils::compute_cid(&test_data);
256 let block = Block::from_parts(cid, test_data.into());
257
258 self.store.put(&block).await?;
260
261 let retrieved = self.store.get(&cid).await?;
263
264 Ok(retrieved.is_some() && retrieved.unwrap().cid() == &cid)
266 }
267
268 fn generate_test_data(&self) -> Result<Vec<Block>> {
270 crate::utils::generate_mixed_size_blocks(5, 3, 2)
271 }
272
273 #[allow(clippy::too_many_arguments)]
275 fn generate_recommendations(
276 &self,
277 avg_write_latency: &Duration,
278 avg_read_latency: &Duration,
279 write_throughput: f64,
280 read_throughput: f64,
281 integrity_ok: bool,
282 responsive: bool,
283 ) -> Vec<String> {
284 let mut recommendations = Vec::new();
285
286 if *avg_write_latency > Duration::from_millis(100) {
287 recommendations.push(
288 "High write latency detected. Consider enabling write coalescing or batch operations.".to_string()
289 );
290 }
291
292 if *avg_read_latency > Duration::from_millis(50) {
293 recommendations.push(
294 "High read latency detected. Consider enabling caching or bloom filters."
295 .to_string(),
296 );
297 }
298
299 if write_throughput < 100.0 {
300 recommendations.push(
301 "Low write throughput. Consider using ParityDB backend or enabling compression."
302 .to_string(),
303 );
304 }
305
306 if read_throughput < 200.0 {
307 recommendations.push(
308 "Low read throughput. Consider increasing cache size or using tiered caching."
309 .to_string(),
310 );
311 }
312
313 if !integrity_ok {
314 recommendations.push(
315 "Data integrity issues detected! This is critical and should be investigated immediately.".to_string()
316 );
317 }
318
319 if !responsive {
320 recommendations.push(
321 "Storage backend is not responsive. Check system resources and backend configuration.".to_string()
322 );
323 }
324
325 if recommendations.is_empty() {
326 recommendations.push("Storage is performing well. No issues detected.".to_string());
327 }
328
329 recommendations
330 }
331
332 fn calculate_health_score(
334 &self,
335 success_rate: f64,
336 integrity_ok: bool,
337 responsive: bool,
338 write_throughput: f64,
339 read_throughput: f64,
340 ) -> u8 {
341 let mut score = 0u32;
342
343 score += (success_rate * 40.0) as u32;
345
346 if integrity_ok {
348 score += 30;
349 }
350
351 if responsive {
353 score += 15;
354 }
355
356 if write_throughput >= 100.0 {
358 score += 7;
359 } else {
360 score += (write_throughput / 100.0 * 7.0) as u32;
361 }
362
363 if read_throughput >= 200.0 {
365 score += 8;
366 } else {
367 score += (read_throughput / 200.0 * 8.0) as u32;
368 }
369
370 score.min(100) as u8
371 }
372}
373
374pub struct BenchmarkComparison {
376 results: HashMap<String, DiagnosticsReport>,
377}
378
379impl BenchmarkComparison {
380 pub fn new() -> Self {
382 Self {
383 results: HashMap::new(),
384 }
385 }
386
387 pub fn add_result(&mut self, name: String, report: DiagnosticsReport) {
389 self.results.insert(name, report);
390 }
391
392 pub fn fastest_write_backend(&self) -> Option<(&str, &DiagnosticsReport)> {
394 self.results
395 .iter()
396 .min_by_key(|(_, r)| r.performance.avg_write_latency)
397 .map(|(name, report)| (name.as_str(), report))
398 }
399
400 pub fn fastest_read_backend(&self) -> Option<(&str, &DiagnosticsReport)> {
402 self.results
403 .iter()
404 .min_by_key(|(_, r)| r.performance.avg_read_latency)
405 .map(|(name, report)| (name.as_str(), report))
406 }
407
408 pub fn healthiest_backend(&self) -> Option<(&str, &DiagnosticsReport)> {
410 self.results
411 .iter()
412 .max_by_key(|(_, r)| r.health_score)
413 .map(|(name, report)| (name.as_str(), report))
414 }
415
416 pub fn summary(&self) -> String {
418 let mut summary = String::from("=== Storage Backend Comparison ===\n\n");
419
420 for (name, report) in &self.results {
421 summary.push_str(&format!(
422 "{}: Health Score = {}/100\n",
423 name, report.health_score
424 ));
425 summary.push_str(&format!(
426 " Write Latency: {:?}, Read Latency: {:?}\n",
427 report.performance.avg_write_latency, report.performance.avg_read_latency
428 ));
429 summary.push_str(&format!(
430 " Write Throughput: {:.2} blocks/s, Read Throughput: {:.2} blocks/s\n\n",
431 report.performance.write_throughput, report.performance.read_throughput
432 ));
433 }
434
435 if let Some((name, _)) = self.fastest_write_backend() {
436 summary.push_str(&format!("Fastest for writes: {name}\n"));
437 }
438
439 if let Some((name, _)) = self.fastest_read_backend() {
440 summary.push_str(&format!("Fastest for reads: {name}\n"));
441 }
442
443 if let Some((name, _)) = self.healthiest_backend() {
444 summary.push_str(&format!("Healthiest overall: {name}\n"));
445 }
446
447 summary
448 }
449}
450
451impl Default for BenchmarkComparison {
452 fn default() -> Self {
453 Self::new()
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::MemoryBlockStore;
461
462 #[tokio::test]
463 async fn test_diagnostics_run() {
464 let store = MemoryBlockStore::new();
465 let mut diagnostics = StorageDiagnostics::new(store, "MemoryStore".to_string());
466
467 let report = diagnostics.run().await.unwrap();
468 assert_eq!(report.backend, "MemoryStore");
469 assert!(report.health_score > 0);
470 assert!(report.health.integrity_ok);
471 }
472
473 #[tokio::test]
474 async fn test_quick_health_check() {
475 let store = MemoryBlockStore::new();
476 let mut diagnostics = StorageDiagnostics::new(store, "MemoryStore".to_string());
477
478 let healthy = diagnostics.quick_health_check().await.unwrap();
479 assert!(healthy);
480 }
481
482 #[tokio::test]
483 async fn test_benchmark_comparison() {
484 let mut comparison = BenchmarkComparison::new();
485
486 let store1 = MemoryBlockStore::new();
487 let mut diag1 = StorageDiagnostics::new(store1, "Memory1".to_string());
488 let report1 = diag1.run().await.unwrap();
489 comparison.add_result("Memory1".to_string(), report1);
490
491 let store2 = MemoryBlockStore::new();
492 let mut diag2 = StorageDiagnostics::new(store2, "Memory2".to_string());
493 let report2 = diag2.run().await.unwrap();
494 comparison.add_result("Memory2".to_string(), report2);
495
496 assert!(comparison.fastest_write_backend().is_some());
497 assert!(comparison.fastest_read_backend().is_some());
498 assert!(comparison.healthiest_backend().is_some());
499
500 let summary = comparison.summary();
501 assert!(summary.contains("Storage Backend Comparison"));
502 }
503}