Skip to main content

rust_par2/
repair.rs

1//! PAR2 repair engine.
2//!
3//! Repairs damaged or missing files using Reed-Solomon recovery data.
4//!
5//! Algorithm:
6//! 1. Verify to identify damaged/missing blocks
7//! 2. Load recovery blocks from volume files
8//! 3. Build and invert the decode matrix over GF(2^16)
9//! 4. Apply the inverse to recovery data to reconstruct original blocks
10//! 5. Write repaired blocks back to files
11
12use std::collections::hash_map::Entry;
13use std::collections::HashMap;
14use std::fmt;
15use std::io::{Read, Seek, SeekFrom, Write};
16use std::path::Path;
17
18use rayon::prelude::*;
19use tracing::{debug, info};
20
21use crate::gf_simd;
22use crate::matrix::GfMatrix;
23use crate::recovery::{load_recovery_blocks, RecoveryBlock};
24use crate::types::{Par2FileSet, VerifyResult};
25use crate::verify;
26
27/// Result of a repair operation.
28#[derive(Debug)]
29pub struct RepairResult {
30    /// Whether the repair succeeded (all files now intact).
31    pub success: bool,
32    /// Number of blocks repaired.
33    pub blocks_repaired: u32,
34    /// Number of files repaired.
35    pub files_repaired: usize,
36    /// Descriptive message.
37    pub message: String,
38}
39
40impl fmt::Display for RepairResult {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        if self.success {
43            write!(
44                f,
45                "Repair complete: {} blocks repaired across {} files",
46                self.blocks_repaired, self.files_repaired
47            )
48        } else {
49            write!(f, "Repair failed: {}", self.message)
50        }
51    }
52}
53
54/// Errors that can occur during repair.
55#[derive(Debug, thiserror::Error)]
56pub enum RepairError {
57    #[error("I/O error: {0}")]
58    Io(#[from] std::io::Error),
59    #[error("Insufficient recovery data: need {needed} blocks, have {available}")]
60    InsufficientRecovery { needed: u32, available: u32 },
61    #[error("Decode matrix is singular — cannot repair with these recovery blocks")]
62    SingularMatrix,
63    #[error("No damage detected — nothing to repair")]
64    NoDamage,
65    #[error("Verification after repair failed: {0}")]
66    VerifyFailed(String),
67}
68
69/// Repair damaged/missing files in a PAR2 set.
70///
71/// This is a blocking operation. For async contexts, wrap in `spawn_blocking`.
72///
73/// Runs verification internally to identify damage. If you already have a
74/// [`VerifyResult`] from a prior [`verify()`](crate::verify) call, use
75/// [`repair_from_verify`] instead to skip the redundant verification pass.
76pub fn repair(file_set: &Par2FileSet, dir: &Path) -> Result<RepairResult, RepairError> {
77    let verify_result = verify::verify(file_set, dir);
78    repair_from_verify(file_set, dir, &verify_result)
79}
80
81/// Repair using a pre-computed [`VerifyResult`].
82///
83/// This skips the initial verification pass, saving significant time when the
84/// caller has already called [`verify()`](crate::verify). The `verify_result`
85/// must have been computed against the same `file_set` and `dir`.
86///
87/// This is a blocking operation. For async contexts, wrap in `spawn_blocking`.
88pub fn repair_from_verify(
89    file_set: &Par2FileSet,
90    dir: &Path,
91    verify_result: &VerifyResult,
92) -> Result<RepairResult, RepairError> {
93    if verify_result.all_correct() {
94        return Err(RepairError::NoDamage);
95    }
96
97    let blocks_needed = verify_result.blocks_needed();
98    info!(
99        blocks_needed,
100        damaged = verify_result.damaged.len(),
101        missing = verify_result.missing.len(),
102        "Repair: damage detected"
103    );
104
105    // Step 1: Load recovery blocks
106    let recovery_blocks =
107        load_recovery_blocks(dir, &file_set.recovery_set_id, file_set.slice_size);
108
109    if (recovery_blocks.len() as u32) < blocks_needed {
110        return Err(RepairError::InsufficientRecovery {
111            needed: blocks_needed,
112            available: recovery_blocks.len() as u32,
113        });
114    }
115
116    // Step 3: Map files to a global block index
117    // Each file's blocks are numbered sequentially: file0_block0, file0_block1, ..., file1_block0, ...
118    let block_map = build_block_map(file_set);
119    let total_input_blocks = block_map.total_blocks as usize;
120
121    // Identify which global block indices are damaged/missing
122    let damaged_indices = find_damaged_block_indices(&verify_result, &block_map);
123    info!(
124        damaged_block_count = damaged_indices.len(),
125        total_input_blocks,
126        "Mapped damaged blocks to global indices"
127    );
128
129    // Step 4: Build the decode matrix
130    // Select recovery blocks to use (we need exactly damaged_indices.len())
131    let recovery_to_use: Vec<&RecoveryBlock> = recovery_blocks
132        .iter()
133        .take(damaged_indices.len())
134        .collect();
135
136    let recovery_exponents: Vec<u32> = recovery_to_use.iter().map(|b| b.exponent).collect();
137
138    // Build the full encoding matrix, then select the rows we have
139    // (intact data rows + selected recovery rows)
140    let encoding_matrix =
141        GfMatrix::par2_encoding_matrix(total_input_blocks, &recovery_exponents);
142
143    // Build the "available" row selection:
144    // For each output position (0..total_input_blocks):
145    //   - If the block is intact, use the identity row (row = block_index)
146    //   - If the block is damaged, use a recovery row
147    let mut available_rows: Vec<usize> = Vec::with_capacity(total_input_blocks);
148    let mut recovery_idx = 0;
149    for block_idx in 0..total_input_blocks {
150        if damaged_indices.contains(&block_idx) {
151            // Use recovery row: these are at index total_input_blocks + recovery_idx
152            available_rows.push(total_input_blocks + recovery_idx);
153            recovery_idx += 1;
154        } else {
155            // Use identity row (data is intact)
156            available_rows.push(block_idx);
157        }
158    }
159
160    let decode_submatrix = encoding_matrix.select_rows(&available_rows);
161    let inverse = decode_submatrix
162        .invert()
163        .ok_or(RepairError::SingularMatrix)?;
164
165    info!("Decode matrix inverted successfully");
166
167    // Step 5: Reconstruct damaged blocks using streaming I/O + parallel compute.
168    //
169    // Source-major approach: stream source blocks one at a time through a reader
170    // thread, applying each source to ALL damaged-block outputs in parallel.
171    // This overlaps I/O with compute (double-buffered via sync_channel) and
172    // reduces memory from O(total_blocks × slice) to O(damaged × slice).
173
174    let slice_size = file_set.slice_size as usize;
175    let num_damaged = damaged_indices.len();
176    let num_sources = total_input_blocks;
177
178    // Coefficient matrix: coeffs[dmg_i][src_idx]
179    let coeffs: Vec<Vec<u16>> = (0..num_damaged)
180        .map(|dmg_i| {
181            (0..num_sources)
182                .map(|src_idx| inverse.get(damaged_indices[dmg_i], src_idx))
183                .collect()
184        })
185        .collect();
186
187    // Pre-allocate output buffers (one per damaged block)
188    let mut outputs: Vec<Vec<u8>> = (0..num_damaged)
189        .map(|_| vec![0u8; slice_size])
190        .collect();
191
192    // Stream source blocks via a reader thread, overlap I/O with compute.
193    // sync_channel(2) gives double-buffering: reader can be 2 blocks ahead.
194    let read_error: Option<std::io::Error> = std::thread::scope(|scope| {
195        let (tx, rx) = std::sync::mpsc::sync_channel::<(usize, Vec<u8>)>(2);
196
197        let reader = scope.spawn({
198            let damaged_indices = &damaged_indices;
199            let recovery_to_use = &recovery_to_use;
200            let block_map = &block_map;
201            move || -> Result<(), std::io::Error> {
202                let mut recovery_idx = 0usize;
203                let mut file_handles: HashMap<String, std::fs::File> = HashMap::new();
204
205                for src_idx in 0..num_sources {
206                    let data = if damaged_indices.contains(&src_idx) {
207                        let d = recovery_to_use[recovery_idx].data.clone();
208                        recovery_idx += 1;
209                        d
210                    } else {
211                        read_source_block(
212                            dir,
213                            block_map,
214                            src_idx,
215                            slice_size,
216                            &mut file_handles,
217                        )?
218                    };
219
220                    if tx.send((src_idx, data)).is_err() {
221                        break; // Receiver dropped (e.g. panic on compute side)
222                    }
223                }
224                Ok(())
225            }
226        });
227
228        // Compute: apply each source to all damaged outputs in parallel.
229        // The source block (~768KB) stays hot in L3 as rayon threads share it.
230        for (src_idx, src_data) in rx {
231            outputs
232                .par_iter_mut()
233                .enumerate()
234                .for_each(|(dmg_i, dst)| {
235                    let coeff = coeffs[dmg_i][src_idx];
236                    if coeff != 0 {
237                        gf_simd::mul_add_buffer(dst, &src_data, coeff);
238                    }
239                });
240        }
241
242        // Collect reader result
243        match reader.join().unwrap() {
244            Ok(()) => None,
245            Err(e) => Some(e),
246        }
247    });
248
249    if let Some(e) = read_error {
250        return Err(RepairError::Io(e));
251    }
252
253    let repaired_blocks: Vec<(usize, Vec<u8>)> = damaged_indices
254        .iter()
255        .copied()
256        .zip(outputs)
257        .collect();
258
259    // Step 6: Write repaired blocks back to files
260    let mut files_touched = std::collections::HashSet::new();
261
262    for (global_idx, data) in &repaired_blocks {
263        let (filename, file_offset, write_len) =
264            block_map.global_to_file(*global_idx, slice_size);
265
266        let file_path = dir.join(&filename);
267        debug!(
268            filename,
269            global_block = global_idx,
270            offset = file_offset,
271            len = write_len,
272            "Writing repaired block"
273        );
274
275        // Create file if missing, open for write if exists
276        let mut f = std::fs::OpenOptions::new()
277            .create(true)
278            .write(true)
279            .open(&file_path)?;
280
281        // Ensure file is the right size (for missing files)
282        let expected_size = block_map
283            .files
284            .iter()
285            .find(|bf| bf.filename == filename)
286            .map(|bf| bf.file_size)
287            .unwrap_or(0);
288        let current_size = f.metadata()?.len();
289        if current_size < expected_size {
290            f.set_len(expected_size)?;
291        }
292
293        f.seek(SeekFrom::Start(file_offset as u64))?;
294        f.write_all(&data[..write_len])?;
295        files_touched.insert(filename.clone());
296    }
297
298    // Step 7: Re-verify
299    let re_verify = verify::verify(file_set, dir);
300    if re_verify.all_correct() {
301        info!(
302            blocks = repaired_blocks.len(),
303            files = files_touched.len(),
304            "Repair successful — all files verified"
305        );
306        Ok(RepairResult {
307            success: true,
308            blocks_repaired: repaired_blocks.len() as u32,
309            files_repaired: files_touched.len(),
310            message: "All files repaired and verified".to_string(),
311        })
312    } else {
313        Err(RepairError::VerifyFailed(format!("{re_verify}")))
314    }
315}
316
317// ---------------------------------------------------------------------------
318// Block mapping
319// ---------------------------------------------------------------------------
320
321/// Maps between global block indices and per-file block positions.
322struct BlockMap {
323    files: Vec<BlockFile>,
324    total_blocks: u32,
325}
326
327struct BlockFile {
328    filename: String,
329    file_size: u64,
330    block_count: u32,
331    /// First global block index for this file.
332    start_block: u32,
333}
334
335fn build_block_map(file_set: &Par2FileSet) -> BlockMap {
336    let slice_size = file_set.slice_size;
337    let mut files = Vec::new();
338    let mut block_offset = 0u32;
339
340    // Sort files by file ID for deterministic ordering (same as par2cmdline)
341    let mut sorted_files: Vec<_> = file_set.files.values().collect();
342    sorted_files.sort_by_key(|f| f.file_id);
343
344    for f in sorted_files {
345        let block_count = if slice_size == 0 {
346            0
347        } else {
348            ((f.size + slice_size - 1) / slice_size) as u32
349        };
350        files.push(BlockFile {
351            filename: f.filename.clone(),
352            file_size: f.size,
353            block_count,
354            start_block: block_offset,
355        });
356        block_offset += block_count;
357    }
358
359    BlockMap {
360        files,
361        total_blocks: block_offset,
362    }
363}
364
365impl BlockMap {
366    /// Convert a global block index to (filename, file_byte_offset, bytes_to_write).
367    fn global_to_file(&self, global_idx: usize, slice_size: usize) -> (String, usize, usize) {
368        let global = global_idx as u32;
369        for f in &self.files {
370            if global >= f.start_block && global < f.start_block + f.block_count {
371                let local_block = (global - f.start_block) as usize;
372                let file_offset = local_block * slice_size;
373                // Last block may be shorter than slice_size
374                let remaining = f.file_size as usize - file_offset;
375                let write_len = remaining.min(slice_size);
376                return (f.filename.clone(), file_offset, write_len);
377            }
378        }
379        panic!("Global block index {global_idx} out of range");
380    }
381}
382
383fn find_damaged_block_indices(verify_result: &VerifyResult, block_map: &BlockMap) -> Vec<usize> {
384    let mut indices = Vec::new();
385
386    for damaged in &verify_result.damaged {
387        if let Some(bf) = block_map.files.iter().find(|f| f.filename == damaged.filename) {
388            if damaged.damaged_block_indices.is_empty() {
389                // No per-block info — assume all blocks damaged
390                for i in 0..bf.block_count {
391                    indices.push((bf.start_block + i) as usize);
392                }
393            } else {
394                // Use precise per-block damage info
395                for &local_idx in &damaged.damaged_block_indices {
396                    indices.push((bf.start_block + local_idx) as usize);
397                }
398            }
399        }
400    }
401
402    for missing in &verify_result.missing {
403        if let Some(bf) = block_map.files.iter().find(|f| f.filename == missing.filename) {
404            for i in 0..bf.block_count {
405                indices.push((bf.start_block + i) as usize);
406            }
407        }
408    }
409
410    indices.sort();
411    indices.dedup();
412    indices
413}
414
415/// Read a single source block from disk, reusing file handles.
416fn read_source_block(
417    dir: &Path,
418    block_map: &BlockMap,
419    global_idx: usize,
420    slice_size: usize,
421    file_handles: &mut HashMap<String, std::fs::File>,
422) -> std::io::Result<Vec<u8>> {
423    let (filename, file_offset, _) = block_map.global_to_file(global_idx, slice_size);
424
425    let handle = match file_handles.entry(filename.clone()) {
426        Entry::Occupied(e) => e.into_mut(),
427        Entry::Vacant(e) => {
428            let path = dir.join(&filename);
429            e.insert(std::fs::File::open(&path)?)
430        }
431    };
432    handle.seek(SeekFrom::Start(file_offset as u64))?;
433
434    let mut buf = vec![0u8; slice_size]; // zero-initialized for last-block padding
435    let mut total = 0;
436    while total < slice_size {
437        match handle.read(&mut buf[total..]) {
438            Ok(0) => break,
439            Ok(n) => total += n,
440            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
441            Err(e) => return Err(e),
442        }
443    }
444    Ok(buf)
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::gf;
451
452    /// Test that the basic RS encode→decode round-trip works.
453    /// 2 data blocks, 2 recovery blocks, lose both data blocks, recover.
454    #[test]
455    fn test_rs_roundtrip_simple() {
456        // 2 input "blocks" of 4 bytes each (2 u16 values)
457        let input0: Vec<u8> = vec![0x01, 0x00, 0x02, 0x00]; // [1, 2] as u16 LE
458        let input1: Vec<u8> = vec![0x03, 0x00, 0x04, 0x00]; // [3, 4] as u16 LE
459
460        let input_count = 2;
461        let recovery_exponents = vec![0u32, 1u32];
462
463        // Build encoding matrix
464        let enc = GfMatrix::par2_encoding_matrix(input_count, &recovery_exponents);
465        // enc is 4x2:
466        // Row 0: [1, 0]  (identity for input 0)
467        // Row 1: [0, 1]  (identity for input 1)
468        // Row 2: [2^(0*0), 2^(0*1)] = [1, 1]  (recovery exp=0)
469        // Row 3: [2^(1*0), 2^(1*1)] = [1, 2]  (recovery exp=1)
470
471        println!("Encoding matrix:");
472        for r in 0..enc.rows {
473            for c in 0..enc.cols {
474                print!("{:5} ", enc.get(r, c));
475            }
476            println!();
477        }
478
479        // Compute recovery blocks: for each u16 position, recovery[e] = Σ input[i] * enc[e+2][i]
480        let slice_size = 4;
481        let u16_per_slice = slice_size / 2;
482        let inputs = [&input0, &input1];
483
484        let mut recovery0 = vec![0u8; slice_size];
485        let mut recovery1 = vec![0u8; slice_size];
486
487        for pos in 0..u16_per_slice {
488            let off = pos * 2;
489            let mut r0: u16 = 0;
490            let mut r1: u16 = 0;
491            for (i, inp) in inputs.iter().enumerate() {
492                let val = u16::from_le_bytes([inp[off], inp[off + 1]]);
493                r0 = gf::add(r0, gf::mul(enc.get(2, i), val));
494                r1 = gf::add(r1, gf::mul(enc.get(3, i), val));
495            }
496            recovery0[off] = r0 as u8;
497            recovery0[off + 1] = (r0 >> 8) as u8;
498            recovery1[off] = r1 as u8;
499            recovery1[off + 1] = (r1 >> 8) as u8;
500        }
501
502        println!("Recovery 0: {:?}", recovery0);
503        println!("Recovery 1: {:?}", recovery1);
504
505        // Now "lose" both input blocks. We have recovery rows 2 and 3.
506        // Select those rows and invert to decode.
507        let decode_sub = enc.select_rows(&[2, 3]);
508        println!("Decode submatrix:");
509        for r in 0..decode_sub.rows {
510            for c in 0..decode_sub.cols {
511                print!("{:5} ", decode_sub.get(r, c));
512            }
513            println!();
514        }
515
516        let inv = decode_sub.invert().expect("Should be invertible");
517        println!("Inverse:");
518        for r in 0..inv.rows {
519            for c in 0..inv.cols {
520                print!("{:5} ", inv.get(r, c));
521            }
522            println!();
523        }
524
525        // Reconstruct: for each u16 position
526        let available = [&recovery0, &recovery1];
527        let mut result0 = vec![0u8; slice_size];
528        let mut result1 = vec![0u8; slice_size];
529
530        for pos in 0..u16_per_slice {
531            let off = pos * 2;
532            let mut out0: u16 = 0;
533            let mut out1: u16 = 0;
534            for (src_idx, src) in available.iter().enumerate() {
535                let val = u16::from_le_bytes([src[off], src[off + 1]]);
536                out0 = gf::add(out0, gf::mul(inv.get(0, src_idx), val));
537                out1 = gf::add(out1, gf::mul(inv.get(1, src_idx), val));
538            }
539            result0[off] = out0 as u8;
540            result0[off + 1] = (out0 >> 8) as u8;
541            result1[off] = out1 as u8;
542            result1[off + 1] = (out1 >> 8) as u8;
543        }
544
545        assert_eq!(result0, input0, "Recovered block 0 should match original");
546        assert_eq!(result1, input1, "Recovered block 1 should match original");
547    }
548}