chie_core/
auto_repair.rs

1//! Automatic data integrity repair for corrupted chunks.
2//!
3//! This module provides automatic detection and repair of corrupted content chunks
4//! by re-fetching them from alternative sources and verifying integrity.
5//!
6//! # Example
7//!
8//! ```
9//! use chie_core::auto_repair::{ChunkRepairStrategy, ChunkRepairConfig, ChunkRepairRequest};
10//! use std::time::Duration;
11//!
12//! // Configure repair strategy
13//! let config = ChunkRepairConfig {
14//!     max_retries: 3,
15//!     retry_delay: Duration::from_millis(100),
16//!     verify_after_repair: true,
17//!     ..Default::default()
18//! };
19//!
20//! // Create repair request for failed chunks
21//! let request = ChunkRepairRequest {
22//!     content_id: "QmTest".to_string(),
23//!     failed_chunk_indices: vec![0, 5, 10],
24//!     total_chunks: 100,
25//! };
26//! ```
27
28use std::collections::{HashMap, HashSet};
29use std::time::{Duration, Instant};
30use thiserror::Error;
31
32/// Errors that can occur during chunk repair.
33#[derive(Debug, Error)]
34pub enum RepairError {
35    #[error("Maximum repair retries exceeded for chunk {chunk_index}")]
36    MaxRetriesExceeded { chunk_index: usize },
37
38    #[error("No alternative sources available for chunk {chunk_index}")]
39    NoSourcesAvailable { chunk_index: usize },
40
41    #[error("Verification failed after repair for chunk {chunk_index}")]
42    VerificationFailed { chunk_index: usize },
43
44    #[error("Repair timeout exceeded for content {content_id}")]
45    TimeoutExceeded { content_id: String },
46
47    #[error("IO error during repair: {0}")]
48    IoError(#[from] std::io::Error),
49}
50
51/// Configuration for automatic chunk repair operations.
52#[derive(Debug, Clone)]
53pub struct ChunkRepairConfig {
54    /// Maximum number of retry attempts per chunk.
55    pub max_retries: u32,
56    /// Delay between retry attempts.
57    pub retry_delay: Duration,
58    /// Whether to verify chunks after repair.
59    pub verify_after_repair: bool,
60    /// Maximum time to spend on repair operations.
61    pub max_repair_time: Duration,
62    /// Minimum number of alternative sources required.
63    pub min_sources: usize,
64    /// Whether to prioritize repairs based on chunk importance.
65    pub prioritize_repairs: bool,
66}
67
68impl Default for ChunkRepairConfig {
69    fn default() -> Self {
70        Self {
71            max_retries: 3,
72            retry_delay: Duration::from_secs(1),
73            verify_after_repair: true,
74            max_repair_time: Duration::from_secs(300),
75            min_sources: 2,
76            prioritize_repairs: true,
77        }
78    }
79}
80
81/// Request to repair specific chunks of content.
82#[derive(Debug, Clone)]
83pub struct ChunkRepairRequest {
84    /// Content identifier.
85    pub content_id: String,
86    /// Indices of chunks that need repair.
87    pub failed_chunk_indices: Vec<usize>,
88    /// Total number of chunks in content.
89    pub total_chunks: usize,
90}
91
92/// Status of a chunk repair operation.
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub enum ChunkRepairStatus {
95    /// Repair is pending.
96    Pending,
97    /// Repair is in progress.
98    InProgress,
99    /// Repair completed successfully.
100    Repaired,
101    /// Repair failed.
102    Failed,
103    /// Repair skipped (e.g., no sources available).
104    Skipped,
105}
106
107/// Statistics for chunk repair operations.
108#[derive(Debug, Clone, Default)]
109pub struct ChunkRepairStats {
110    /// Total chunks attempted to repair.
111    pub total_attempts: usize,
112    /// Successfully repaired chunks.
113    pub successful_repairs: usize,
114    /// Failed repair attempts.
115    pub failed_repairs: usize,
116    /// Skipped repairs.
117    pub skipped_repairs: usize,
118    /// Total bytes repaired.
119    pub bytes_repaired: u64,
120    /// Average repair time per chunk.
121    pub avg_repair_time_ms: u64,
122}
123
124/// Tracks the repair state for a single chunk.
125#[derive(Debug)]
126struct ChunkRepairState {
127    index: usize,
128    status: ChunkRepairStatus,
129    retry_count: u32,
130    last_attempt: Option<Instant>,
131    sources_tried: HashSet<String>,
132}
133
134impl ChunkRepairState {
135    fn new(index: usize) -> Self {
136        Self {
137            index,
138            status: ChunkRepairStatus::Pending,
139            retry_count: 0,
140            last_attempt: None,
141            sources_tried: HashSet::new(),
142        }
143    }
144
145    #[inline]
146    fn can_retry(&self, config: &ChunkRepairConfig) -> bool {
147        self.retry_count < config.max_retries
148    }
149
150    #[inline]
151    fn should_retry(&self, config: &ChunkRepairConfig) -> bool {
152        if !self.can_retry(config) {
153            return false;
154        }
155
156        if let Some(last) = self.last_attempt {
157            last.elapsed() >= config.retry_delay
158        } else {
159            true
160        }
161    }
162
163    #[inline]
164    fn mark_attempt(&mut self, source: String) {
165        self.status = ChunkRepairStatus::InProgress;
166        self.retry_count += 1;
167        self.last_attempt = Some(Instant::now());
168        self.sources_tried.insert(source);
169    }
170}
171
172/// Strategy for repairing corrupted chunks.
173pub struct ChunkRepairStrategy {
174    config: ChunkRepairConfig,
175    chunk_states: HashMap<usize, ChunkRepairState>,
176    stats: ChunkRepairStats,
177    started_at: Option<Instant>,
178}
179
180impl ChunkRepairStrategy {
181    /// Create a new repair strategy with the given configuration.
182    #[must_use]
183    #[inline]
184    pub fn new(config: ChunkRepairConfig) -> Self {
185        Self {
186            config,
187            chunk_states: HashMap::new(),
188            stats: ChunkRepairStats::default(),
189            started_at: None,
190        }
191    }
192
193    /// Initialize repair for a set of failed chunks.
194    #[inline]
195    pub fn initialize_repair(&mut self, request: ChunkRepairRequest) {
196        self.started_at = Some(Instant::now());
197
198        for &index in &request.failed_chunk_indices {
199            if index < request.total_chunks {
200                self.chunk_states
201                    .insert(index, ChunkRepairState::new(index));
202            }
203        }
204
205        self.stats.total_attempts = request.failed_chunk_indices.len();
206    }
207
208    /// Get the next chunk that should be repaired.
209    #[must_use]
210    pub fn next_repair_candidate(&mut self) -> Option<usize> {
211        // Check for timeout
212        if let Some(started) = self.started_at {
213            if started.elapsed() >= self.config.max_repair_time {
214                return None;
215            }
216        }
217
218        // Find chunks that are ready for retry
219        let mut candidates: Vec<_> = self
220            .chunk_states
221            .values()
222            .filter(|state| {
223                matches!(
224                    state.status,
225                    ChunkRepairStatus::Pending | ChunkRepairStatus::InProgress
226                ) && state.should_retry(&self.config)
227            })
228            .collect();
229
230        if candidates.is_empty() {
231            return None;
232        }
233
234        // Prioritize by retry count (fewer retries first)
235        if self.config.prioritize_repairs {
236            candidates.sort_by_key(|state| state.retry_count);
237        }
238
239        candidates.first().map(|state| state.index)
240    }
241
242    /// Mark a repair attempt as started.
243    #[inline]
244    pub fn mark_repair_attempt(&mut self, chunk_index: usize, source: String) {
245        if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
246            state.mark_attempt(source);
247        }
248    }
249
250    /// Mark a chunk as successfully repaired.
251    #[inline]
252    pub fn mark_repaired(&mut self, chunk_index: usize, bytes_repaired: u64) {
253        if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
254            state.status = ChunkRepairStatus::Repaired;
255            self.stats.successful_repairs += 1;
256            self.stats.bytes_repaired += bytes_repaired;
257        }
258    }
259
260    /// Mark a chunk repair as failed.
261    #[inline]
262    pub fn mark_failed(&mut self, chunk_index: usize) {
263        if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
264            if !state.can_retry(&self.config) {
265                state.status = ChunkRepairStatus::Failed;
266                self.stats.failed_repairs += 1;
267            }
268        }
269    }
270
271    /// Mark a chunk repair as skipped.
272    #[inline]
273    pub fn mark_skipped(&mut self, chunk_index: usize) {
274        if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
275            state.status = ChunkRepairStatus::Skipped;
276            self.stats.skipped_repairs += 1;
277        }
278    }
279
280    /// Check if all repairs are complete.
281    #[must_use]
282    #[inline]
283    pub fn is_complete(&self) -> bool {
284        self.chunk_states.values().all(|state| {
285            !matches!(
286                state.status,
287                ChunkRepairStatus::Pending | ChunkRepairStatus::InProgress
288            )
289        })
290    }
291
292    /// Get the current repair statistics.
293    #[must_use]
294    #[inline]
295    pub fn stats(&self) -> &ChunkRepairStats {
296        &self.stats
297    }
298
299    /// Get the status of a specific chunk.
300    #[must_use]
301    #[inline]
302    pub fn chunk_status(&self, index: usize) -> Option<&ChunkRepairStatus> {
303        self.chunk_states.get(&index).map(|state| &state.status)
304    }
305
306    /// Get all chunks that still need repair.
307    #[must_use]
308    #[inline]
309    pub fn pending_repairs(&self) -> Vec<usize> {
310        self.chunk_states
311            .iter()
312            .filter(|(_, state)| {
313                matches!(
314                    state.status,
315                    ChunkRepairStatus::Pending | ChunkRepairStatus::InProgress
316                )
317            })
318            .map(|(index, _)| *index)
319            .collect()
320    }
321
322    /// Get chunks that have been successfully repaired.
323    #[must_use]
324    #[inline]
325    pub fn repaired_chunks(&self) -> Vec<usize> {
326        self.chunk_states
327            .iter()
328            .filter(|(_, state)| state.status == ChunkRepairStatus::Repaired)
329            .map(|(index, _)| *index)
330            .collect()
331    }
332
333    /// Get chunks that failed repair.
334    #[must_use]
335    #[inline]
336    pub fn failed_chunks(&self) -> Vec<usize> {
337        self.chunk_states
338            .iter()
339            .filter(|(_, state)| state.status == ChunkRepairStatus::Failed)
340            .map(|(index, _)| *index)
341            .collect()
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348
349    #[test]
350    fn test_repair_config_defaults() {
351        let config = ChunkRepairConfig::default();
352        assert_eq!(config.max_retries, 3);
353        assert!(config.verify_after_repair);
354        assert_eq!(config.min_sources, 2);
355    }
356
357    #[test]
358    fn test_repair_strategy_initialization() {
359        let config = ChunkRepairConfig::default();
360        let mut strategy = ChunkRepairStrategy::new(config);
361
362        let request = ChunkRepairRequest {
363            content_id: "QmTest".to_string(),
364            failed_chunk_indices: vec![0, 1, 2],
365            total_chunks: 10,
366        };
367
368        strategy.initialize_repair(request);
369        assert_eq!(strategy.stats.total_attempts, 3);
370        assert!(!strategy.is_complete());
371    }
372
373    #[test]
374    fn test_next_repair_candidate() {
375        let config = ChunkRepairConfig {
376            retry_delay: Duration::from_millis(1),
377            ..Default::default()
378        };
379        let mut strategy = ChunkRepairStrategy::new(config);
380
381        let request = ChunkRepairRequest {
382            content_id: "QmTest".to_string(),
383            failed_chunk_indices: vec![0, 1],
384            total_chunks: 10,
385        };
386
387        strategy.initialize_repair(request);
388
389        let candidate = strategy.next_repair_candidate();
390        assert!(candidate.is_some());
391        assert!(candidate.unwrap() == 0 || candidate.unwrap() == 1);
392    }
393
394    #[test]
395    fn test_mark_repaired() {
396        let config = ChunkRepairConfig::default();
397        let mut strategy = ChunkRepairStrategy::new(config);
398
399        let request = ChunkRepairRequest {
400            content_id: "QmTest".to_string(),
401            failed_chunk_indices: vec![0],
402            total_chunks: 10,
403        };
404
405        strategy.initialize_repair(request);
406        strategy.mark_repaired(0, 1024);
407
408        assert_eq!(strategy.stats.successful_repairs, 1);
409        assert_eq!(strategy.stats.bytes_repaired, 1024);
410        assert_eq!(strategy.chunk_status(0), Some(&ChunkRepairStatus::Repaired));
411    }
412
413    #[test]
414    fn test_mark_failed() {
415        let config = ChunkRepairConfig {
416            max_retries: 1,
417            ..Default::default()
418        };
419        let mut strategy = ChunkRepairStrategy::new(config);
420
421        let request = ChunkRepairRequest {
422            content_id: "QmTest".to_string(),
423            failed_chunk_indices: vec![0],
424            total_chunks: 10,
425        };
426
427        strategy.initialize_repair(request);
428        strategy.mark_repair_attempt(0, "peer1".to_string());
429        strategy.mark_failed(0);
430
431        // Should still be able to get next candidate since max_retries = 1
432        let candidate = strategy.next_repair_candidate();
433        assert!(candidate.is_none() || candidate == Some(0));
434    }
435
436    #[test]
437    fn test_repair_completion() {
438        let config = ChunkRepairConfig::default();
439        let mut strategy = ChunkRepairStrategy::new(config);
440
441        let request = ChunkRepairRequest {
442            content_id: "QmTest".to_string(),
443            failed_chunk_indices: vec![0, 1],
444            total_chunks: 10,
445        };
446
447        strategy.initialize_repair(request);
448        assert!(!strategy.is_complete());
449
450        strategy.mark_repaired(0, 1024);
451        assert!(!strategy.is_complete());
452
453        strategy.mark_repaired(1, 1024);
454        assert!(strategy.is_complete());
455    }
456
457    #[test]
458    fn test_pending_and_repaired_lists() {
459        let config = ChunkRepairConfig::default();
460        let mut strategy = ChunkRepairStrategy::new(config);
461
462        let request = ChunkRepairRequest {
463            content_id: "QmTest".to_string(),
464            failed_chunk_indices: vec![0, 1, 2],
465            total_chunks: 10,
466        };
467
468        strategy.initialize_repair(request);
469
470        let pending = strategy.pending_repairs();
471        assert_eq!(pending.len(), 3);
472
473        strategy.mark_repaired(0, 1024);
474
475        let pending = strategy.pending_repairs();
476        assert_eq!(pending.len(), 2);
477
478        let repaired = strategy.repaired_chunks();
479        assert_eq!(repaired.len(), 1);
480        assert!(repaired.contains(&0));
481    }
482
483    #[test]
484    fn test_mark_skipped() {
485        let config = ChunkRepairConfig::default();
486        let mut strategy = ChunkRepairStrategy::new(config);
487
488        let request = ChunkRepairRequest {
489            content_id: "QmTest".to_string(),
490            failed_chunk_indices: vec![0],
491            total_chunks: 10,
492        };
493
494        strategy.initialize_repair(request);
495        strategy.mark_skipped(0);
496
497        assert_eq!(strategy.stats.skipped_repairs, 1);
498        assert_eq!(strategy.chunk_status(0), Some(&ChunkRepairStatus::Skipped));
499    }
500
501    #[test]
502    fn test_retry_logic() {
503        let config = ChunkRepairConfig {
504            max_retries: 2,
505            retry_delay: Duration::from_millis(1),
506            ..Default::default()
507        };
508        let mut strategy = ChunkRepairStrategy::new(config);
509
510        let request = ChunkRepairRequest {
511            content_id: "QmTest".to_string(),
512            failed_chunk_indices: vec![0],
513            total_chunks: 10,
514        };
515
516        strategy.initialize_repair(request);
517
518        // First attempt
519        strategy.mark_repair_attempt(0, "peer1".to_string());
520        strategy.mark_failed(0);
521
522        // Should allow retry
523        std::thread::sleep(Duration::from_millis(2));
524        let candidate = strategy.next_repair_candidate();
525        assert_eq!(candidate, Some(0));
526
527        // Second attempt (max_retries = 2)
528        strategy.mark_repair_attempt(0, "peer2".to_string());
529        strategy.mark_failed(0);
530
531        // Should not allow more retries
532        let state = strategy.chunk_states.get(&0).unwrap();
533        assert_eq!(state.status, ChunkRepairStatus::Failed);
534    }
535}