1use std::collections::{HashMap, HashSet};
29use std::time::{Duration, Instant};
30use thiserror::Error;
31
32#[derive(Debug, Error)]
34pub enum RepairError {
35 #[error("Maximum repair retries exceeded for chunk {chunk_index}")]
36 MaxRetriesExceeded { chunk_index: usize },
37
38 #[error("No alternative sources available for chunk {chunk_index}")]
39 NoSourcesAvailable { chunk_index: usize },
40
41 #[error("Verification failed after repair for chunk {chunk_index}")]
42 VerificationFailed { chunk_index: usize },
43
44 #[error("Repair timeout exceeded for content {content_id}")]
45 TimeoutExceeded { content_id: String },
46
47 #[error("IO error during repair: {0}")]
48 IoError(#[from] std::io::Error),
49}
50
51#[derive(Debug, Clone)]
53pub struct ChunkRepairConfig {
54 pub max_retries: u32,
56 pub retry_delay: Duration,
58 pub verify_after_repair: bool,
60 pub max_repair_time: Duration,
62 pub min_sources: usize,
64 pub prioritize_repairs: bool,
66}
67
68impl Default for ChunkRepairConfig {
69 fn default() -> Self {
70 Self {
71 max_retries: 3,
72 retry_delay: Duration::from_secs(1),
73 verify_after_repair: true,
74 max_repair_time: Duration::from_secs(300),
75 min_sources: 2,
76 prioritize_repairs: true,
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct ChunkRepairRequest {
84 pub content_id: String,
86 pub failed_chunk_indices: Vec<usize>,
88 pub total_chunks: usize,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub enum ChunkRepairStatus {
95 Pending,
97 InProgress,
99 Repaired,
101 Failed,
103 Skipped,
105}
106
107#[derive(Debug, Clone, Default)]
109pub struct ChunkRepairStats {
110 pub total_attempts: usize,
112 pub successful_repairs: usize,
114 pub failed_repairs: usize,
116 pub skipped_repairs: usize,
118 pub bytes_repaired: u64,
120 pub avg_repair_time_ms: u64,
122}
123
124#[derive(Debug)]
126struct ChunkRepairState {
127 index: usize,
128 status: ChunkRepairStatus,
129 retry_count: u32,
130 last_attempt: Option<Instant>,
131 sources_tried: HashSet<String>,
132}
133
134impl ChunkRepairState {
135 fn new(index: usize) -> Self {
136 Self {
137 index,
138 status: ChunkRepairStatus::Pending,
139 retry_count: 0,
140 last_attempt: None,
141 sources_tried: HashSet::new(),
142 }
143 }
144
145 #[inline]
146 fn can_retry(&self, config: &ChunkRepairConfig) -> bool {
147 self.retry_count < config.max_retries
148 }
149
150 #[inline]
151 fn should_retry(&self, config: &ChunkRepairConfig) -> bool {
152 if !self.can_retry(config) {
153 return false;
154 }
155
156 if let Some(last) = self.last_attempt {
157 last.elapsed() >= config.retry_delay
158 } else {
159 true
160 }
161 }
162
163 #[inline]
164 fn mark_attempt(&mut self, source: String) {
165 self.status = ChunkRepairStatus::InProgress;
166 self.retry_count += 1;
167 self.last_attempt = Some(Instant::now());
168 self.sources_tried.insert(source);
169 }
170}
171
172pub struct ChunkRepairStrategy {
174 config: ChunkRepairConfig,
175 chunk_states: HashMap<usize, ChunkRepairState>,
176 stats: ChunkRepairStats,
177 started_at: Option<Instant>,
178}
179
180impl ChunkRepairStrategy {
181 #[must_use]
183 #[inline]
184 pub fn new(config: ChunkRepairConfig) -> Self {
185 Self {
186 config,
187 chunk_states: HashMap::new(),
188 stats: ChunkRepairStats::default(),
189 started_at: None,
190 }
191 }
192
193 #[inline]
195 pub fn initialize_repair(&mut self, request: ChunkRepairRequest) {
196 self.started_at = Some(Instant::now());
197
198 for &index in &request.failed_chunk_indices {
199 if index < request.total_chunks {
200 self.chunk_states
201 .insert(index, ChunkRepairState::new(index));
202 }
203 }
204
205 self.stats.total_attempts = request.failed_chunk_indices.len();
206 }
207
208 #[must_use]
210 pub fn next_repair_candidate(&mut self) -> Option<usize> {
211 if let Some(started) = self.started_at {
213 if started.elapsed() >= self.config.max_repair_time {
214 return None;
215 }
216 }
217
218 let mut candidates: Vec<_> = self
220 .chunk_states
221 .values()
222 .filter(|state| {
223 matches!(
224 state.status,
225 ChunkRepairStatus::Pending | ChunkRepairStatus::InProgress
226 ) && state.should_retry(&self.config)
227 })
228 .collect();
229
230 if candidates.is_empty() {
231 return None;
232 }
233
234 if self.config.prioritize_repairs {
236 candidates.sort_by_key(|state| state.retry_count);
237 }
238
239 candidates.first().map(|state| state.index)
240 }
241
242 #[inline]
244 pub fn mark_repair_attempt(&mut self, chunk_index: usize, source: String) {
245 if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
246 state.mark_attempt(source);
247 }
248 }
249
250 #[inline]
252 pub fn mark_repaired(&mut self, chunk_index: usize, bytes_repaired: u64) {
253 if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
254 state.status = ChunkRepairStatus::Repaired;
255 self.stats.successful_repairs += 1;
256 self.stats.bytes_repaired += bytes_repaired;
257 }
258 }
259
260 #[inline]
262 pub fn mark_failed(&mut self, chunk_index: usize) {
263 if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
264 if !state.can_retry(&self.config) {
265 state.status = ChunkRepairStatus::Failed;
266 self.stats.failed_repairs += 1;
267 }
268 }
269 }
270
271 #[inline]
273 pub fn mark_skipped(&mut self, chunk_index: usize) {
274 if let Some(state) = self.chunk_states.get_mut(&chunk_index) {
275 state.status = ChunkRepairStatus::Skipped;
276 self.stats.skipped_repairs += 1;
277 }
278 }
279
280 #[must_use]
282 #[inline]
283 pub fn is_complete(&self) -> bool {
284 self.chunk_states.values().all(|state| {
285 !matches!(
286 state.status,
287 ChunkRepairStatus::Pending | ChunkRepairStatus::InProgress
288 )
289 })
290 }
291
292 #[must_use]
294 #[inline]
295 pub fn stats(&self) -> &ChunkRepairStats {
296 &self.stats
297 }
298
299 #[must_use]
301 #[inline]
302 pub fn chunk_status(&self, index: usize) -> Option<&ChunkRepairStatus> {
303 self.chunk_states.get(&index).map(|state| &state.status)
304 }
305
306 #[must_use]
308 #[inline]
309 pub fn pending_repairs(&self) -> Vec<usize> {
310 self.chunk_states
311 .iter()
312 .filter(|(_, state)| {
313 matches!(
314 state.status,
315 ChunkRepairStatus::Pending | ChunkRepairStatus::InProgress
316 )
317 })
318 .map(|(index, _)| *index)
319 .collect()
320 }
321
322 #[must_use]
324 #[inline]
325 pub fn repaired_chunks(&self) -> Vec<usize> {
326 self.chunk_states
327 .iter()
328 .filter(|(_, state)| state.status == ChunkRepairStatus::Repaired)
329 .map(|(index, _)| *index)
330 .collect()
331 }
332
333 #[must_use]
335 #[inline]
336 pub fn failed_chunks(&self) -> Vec<usize> {
337 self.chunk_states
338 .iter()
339 .filter(|(_, state)| state.status == ChunkRepairStatus::Failed)
340 .map(|(index, _)| *index)
341 .collect()
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348
349 #[test]
350 fn test_repair_config_defaults() {
351 let config = ChunkRepairConfig::default();
352 assert_eq!(config.max_retries, 3);
353 assert!(config.verify_after_repair);
354 assert_eq!(config.min_sources, 2);
355 }
356
357 #[test]
358 fn test_repair_strategy_initialization() {
359 let config = ChunkRepairConfig::default();
360 let mut strategy = ChunkRepairStrategy::new(config);
361
362 let request = ChunkRepairRequest {
363 content_id: "QmTest".to_string(),
364 failed_chunk_indices: vec![0, 1, 2],
365 total_chunks: 10,
366 };
367
368 strategy.initialize_repair(request);
369 assert_eq!(strategy.stats.total_attempts, 3);
370 assert!(!strategy.is_complete());
371 }
372
373 #[test]
374 fn test_next_repair_candidate() {
375 let config = ChunkRepairConfig {
376 retry_delay: Duration::from_millis(1),
377 ..Default::default()
378 };
379 let mut strategy = ChunkRepairStrategy::new(config);
380
381 let request = ChunkRepairRequest {
382 content_id: "QmTest".to_string(),
383 failed_chunk_indices: vec![0, 1],
384 total_chunks: 10,
385 };
386
387 strategy.initialize_repair(request);
388
389 let candidate = strategy.next_repair_candidate();
390 assert!(candidate.is_some());
391 assert!(candidate.unwrap() == 0 || candidate.unwrap() == 1);
392 }
393
394 #[test]
395 fn test_mark_repaired() {
396 let config = ChunkRepairConfig::default();
397 let mut strategy = ChunkRepairStrategy::new(config);
398
399 let request = ChunkRepairRequest {
400 content_id: "QmTest".to_string(),
401 failed_chunk_indices: vec![0],
402 total_chunks: 10,
403 };
404
405 strategy.initialize_repair(request);
406 strategy.mark_repaired(0, 1024);
407
408 assert_eq!(strategy.stats.successful_repairs, 1);
409 assert_eq!(strategy.stats.bytes_repaired, 1024);
410 assert_eq!(strategy.chunk_status(0), Some(&ChunkRepairStatus::Repaired));
411 }
412
413 #[test]
414 fn test_mark_failed() {
415 let config = ChunkRepairConfig {
416 max_retries: 1,
417 ..Default::default()
418 };
419 let mut strategy = ChunkRepairStrategy::new(config);
420
421 let request = ChunkRepairRequest {
422 content_id: "QmTest".to_string(),
423 failed_chunk_indices: vec![0],
424 total_chunks: 10,
425 };
426
427 strategy.initialize_repair(request);
428 strategy.mark_repair_attempt(0, "peer1".to_string());
429 strategy.mark_failed(0);
430
431 let candidate = strategy.next_repair_candidate();
433 assert!(candidate.is_none() || candidate == Some(0));
434 }
435
436 #[test]
437 fn test_repair_completion() {
438 let config = ChunkRepairConfig::default();
439 let mut strategy = ChunkRepairStrategy::new(config);
440
441 let request = ChunkRepairRequest {
442 content_id: "QmTest".to_string(),
443 failed_chunk_indices: vec![0, 1],
444 total_chunks: 10,
445 };
446
447 strategy.initialize_repair(request);
448 assert!(!strategy.is_complete());
449
450 strategy.mark_repaired(0, 1024);
451 assert!(!strategy.is_complete());
452
453 strategy.mark_repaired(1, 1024);
454 assert!(strategy.is_complete());
455 }
456
457 #[test]
458 fn test_pending_and_repaired_lists() {
459 let config = ChunkRepairConfig::default();
460 let mut strategy = ChunkRepairStrategy::new(config);
461
462 let request = ChunkRepairRequest {
463 content_id: "QmTest".to_string(),
464 failed_chunk_indices: vec![0, 1, 2],
465 total_chunks: 10,
466 };
467
468 strategy.initialize_repair(request);
469
470 let pending = strategy.pending_repairs();
471 assert_eq!(pending.len(), 3);
472
473 strategy.mark_repaired(0, 1024);
474
475 let pending = strategy.pending_repairs();
476 assert_eq!(pending.len(), 2);
477
478 let repaired = strategy.repaired_chunks();
479 assert_eq!(repaired.len(), 1);
480 assert!(repaired.contains(&0));
481 }
482
483 #[test]
484 fn test_mark_skipped() {
485 let config = ChunkRepairConfig::default();
486 let mut strategy = ChunkRepairStrategy::new(config);
487
488 let request = ChunkRepairRequest {
489 content_id: "QmTest".to_string(),
490 failed_chunk_indices: vec![0],
491 total_chunks: 10,
492 };
493
494 strategy.initialize_repair(request);
495 strategy.mark_skipped(0);
496
497 assert_eq!(strategy.stats.skipped_repairs, 1);
498 assert_eq!(strategy.chunk_status(0), Some(&ChunkRepairStatus::Skipped));
499 }
500
501 #[test]
502 fn test_retry_logic() {
503 let config = ChunkRepairConfig {
504 max_retries: 2,
505 retry_delay: Duration::from_millis(1),
506 ..Default::default()
507 };
508 let mut strategy = ChunkRepairStrategy::new(config);
509
510 let request = ChunkRepairRequest {
511 content_id: "QmTest".to_string(),
512 failed_chunk_indices: vec![0],
513 total_chunks: 10,
514 };
515
516 strategy.initialize_repair(request);
517
518 strategy.mark_repair_attempt(0, "peer1".to_string());
520 strategy.mark_failed(0);
521
522 std::thread::sleep(Duration::from_millis(2));
524 let candidate = strategy.next_repair_candidate();
525 assert_eq!(candidate, Some(0));
526
527 strategy.mark_repair_attempt(0, "peer2".to_string());
529 strategy.mark_failed(0);
530
531 let state = strategy.chunk_states.get(&0).unwrap();
533 assert_eq!(state.status, ChunkRepairStatus::Failed);
534 }
535}