Skip to main content

idb/cli/
recover.rs

1use std::io::Write;
2
3use byteorder::{BigEndian, ByteOrder};
4use colored::Colorize;
5use rayon::prelude::*;
6use serde::Serialize;
7
8use crate::cli::{create_progress_bar, wprintln};
9use crate::innodb::checksum::{validate_checksum, validate_lsn, ChecksumAlgorithm};
10use crate::innodb::constants::*;
11use crate::innodb::corruption::{classify_corruption, CorruptionPattern};
12use crate::innodb::page::FilHeader;
13use crate::innodb::page_types::PageType;
14use crate::innodb::record::walk_compact_records;
15use crate::innodb::tablespace::Tablespace;
16use crate::innodb::write;
17use crate::IdbError;
18
19/// Options for the `inno recover` subcommand.
20pub struct RecoverOptions {
21    /// Path to the InnoDB tablespace file (.ibd).
22    pub file: String,
23    /// Analyze a single page instead of full scan.
24    pub page: Option<u64>,
25    /// Show per-page details.
26    pub verbose: bool,
27    /// Emit output as JSON.
28    pub json: bool,
29    /// Extract records from corrupt pages with valid headers.
30    pub force: bool,
31    /// Override the auto-detected page size.
32    pub page_size: Option<u32>,
33    /// Path to MySQL keyring file for decrypting encrypted tablespaces.
34    pub keyring: Option<String>,
35    /// Number of threads for parallel processing (0 = auto-detect).
36    pub threads: usize,
37    /// Use memory-mapped I/O for file access.
38    pub mmap: bool,
39    /// Stream results incrementally for lower memory usage.
40    pub streaming: bool,
41    /// Path to write a new tablespace from recoverable pages.
42    pub rebuild: Option<String>,
43}
44
45/// Page integrity status.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
47#[serde(rename_all = "lowercase")]
48enum PageStatus {
49    Intact,
50    Corrupt,
51    Empty,
52    Unreadable,
53}
54
55impl PageStatus {
56    fn label(self) -> &'static str {
57        match self {
58            PageStatus::Intact => "intact",
59            PageStatus::Corrupt => "CORRUPT",
60            PageStatus::Empty => "empty",
61            PageStatus::Unreadable => "UNREADABLE",
62        }
63    }
64}
65
66/// Top-level JSON output for the recovery report.
67#[derive(Serialize)]
68struct RecoverReport {
69    file: String,
70    file_size: u64,
71    page_size: u32,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    page_size_source: Option<String>,
74    total_pages: u64,
75    summary: RecoverSummary,
76    recoverable_records: u64,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    force_recoverable_records: Option<u64>,
79    #[serde(skip_serializing_if = "Vec::is_empty")]
80    pages: Vec<PageRecoveryInfo>,
81}
82
83/// Status counts by category.
84#[derive(Serialize)]
85struct RecoverSummary {
86    intact: u64,
87    corrupt: u64,
88    empty: u64,
89    unreadable: u64,
90}
91
92/// Per-page recovery info for JSON output.
93#[derive(Serialize)]
94struct PageRecoveryInfo {
95    page_number: u64,
96    status: PageStatus,
97    page_type: String,
98    checksum_valid: bool,
99    lsn_valid: bool,
100    lsn: u64,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    corruption_pattern: Option<String>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    record_count: Option<usize>,
105    #[serde(skip_serializing_if = "Vec::is_empty")]
106    records: Vec<RecoveredRecord>,
107}
108
109/// A single recovered record for verbose JSON output.
110#[derive(Serialize)]
111struct RecoveredRecord {
112    offset: usize,
113    heap_no: u16,
114    delete_mark: bool,
115    data_hex: String,
116}
117
118/// Computed statistics from page analysis, used by output functions.
119struct RecoverStats {
120    file_size: u64,
121    page_size: u32,
122    page_size_source: Option<String>,
123    scan_count: u64,
124    intact: u64,
125    corrupt: u64,
126    empty: u64,
127    unreadable: u64,
128    total_records: u64,
129    corrupt_records: u64,
130    corrupt_page_numbers: Vec<u64>,
131    index_pages_total: u64,
132    index_pages_recoverable: u64,
133    corruption_patterns: Vec<(String, u64)>,
134}
135
136/// Internal per-page analysis result.
137struct PageAnalysis {
138    page_number: u64,
139    status: PageStatus,
140    page_type: PageType,
141    checksum_valid: bool,
142    lsn_valid: bool,
143    lsn: u64,
144    corruption_pattern: Option<CorruptionPattern>,
145    record_count: Option<usize>,
146    records: Vec<RecoveredRecord>,
147}
148
149/// Try to open the tablespace, with smart page size fallback when page 0 is damaged.
150fn open_tablespace(
151    file: &str,
152    page_size_override: Option<u32>,
153    use_mmap: bool,
154    writer: &mut dyn Write,
155) -> Result<(Tablespace, Option<String>), IdbError> {
156    if let Some(ps) = page_size_override {
157        let ts = crate::cli::open_tablespace(file, Some(ps), use_mmap)?;
158        return Ok((ts, Some("user-specified".to_string())));
159    }
160
161    match crate::cli::open_tablespace(file, None, use_mmap) {
162        Ok(ts) => Ok((ts, None)),
163        Err(_) => {
164            // Page 0 may be corrupt — try common page sizes
165            let candidates = [
166                SIZE_PAGE_16K,
167                SIZE_PAGE_8K,
168                SIZE_PAGE_4K,
169                SIZE_PAGE_32K,
170                SIZE_PAGE_64K,
171            ];
172
173            let file_size = std::fs::metadata(file)
174                .map_err(|e| IdbError::Io(format!("Cannot stat {}: {}", file, e)))?
175                .len();
176
177            for &ps in &candidates {
178                if file_size >= ps as u64 && file_size % ps as u64 == 0 {
179                    if let Ok(ts) = crate::cli::open_tablespace(file, Some(ps), use_mmap) {
180                        let _ = wprintln!(
181                            writer,
182                            "Warning: auto-detect failed, using page size {} (file size divisible)",
183                            ps
184                        );
185                        return Ok((ts, Some(format!("fallback ({})", ps))));
186                    }
187                }
188            }
189
190            // Last resort: default 16K
191            let ts = crate::cli::open_tablespace(file, Some(SIZE_PAGE_DEFAULT), use_mmap)?;
192            let _ = wprintln!(
193                writer,
194                "Warning: using default page size {} (no size divides evenly)",
195                SIZE_PAGE_DEFAULT
196            );
197            Ok((ts, Some("default-fallback".to_string())))
198        }
199    }
200}
201
202/// Analyze a single page and return its status and recovery info.
203fn analyze_page(
204    page_data: &[u8],
205    page_num: u64,
206    page_size: u32,
207    force: bool,
208    verbose_json: bool,
209    vendor_info: Option<&crate::innodb::vendor::VendorInfo>,
210) -> PageAnalysis {
211    // Check all-zeros (empty/allocated page)
212    if page_data.iter().all(|&b| b == 0) {
213        return PageAnalysis {
214            page_number: page_num,
215            status: PageStatus::Empty,
216            page_type: PageType::Allocated,
217            checksum_valid: true,
218            lsn_valid: true,
219            lsn: 0,
220            corruption_pattern: None,
221            record_count: None,
222            records: Vec::new(),
223        };
224    }
225
226    // Parse FIL header
227    let header = match FilHeader::parse(page_data) {
228        Some(h) => h,
229        None => {
230            return PageAnalysis {
231                page_number: page_num,
232                status: PageStatus::Unreadable,
233                page_type: PageType::Unknown(0),
234                checksum_valid: false,
235                lsn_valid: false,
236                lsn: 0,
237                corruption_pattern: None,
238                record_count: None,
239                records: Vec::new(),
240            };
241        }
242    };
243
244    let csum_result = validate_checksum(page_data, page_size, vendor_info);
245    let lsn_valid = validate_lsn(page_data, page_size);
246    let status = if csum_result.valid && lsn_valid {
247        PageStatus::Intact
248    } else {
249        PageStatus::Corrupt
250    };
251
252    // Classify corruption pattern on corrupt pages
253    let corruption_pattern = if status == PageStatus::Corrupt {
254        Some(classify_corruption(page_data, page_size))
255    } else {
256        None
257    };
258
259    // Count records on INDEX pages
260    let (record_count, records) =
261        if header.page_type == PageType::Index && (status == PageStatus::Intact || force) {
262            let recs = walk_compact_records(page_data);
263            let count = recs.len();
264            let recovered = if verbose_json {
265                extract_records(page_data, &recs, page_size)
266            } else {
267                Vec::new()
268            };
269            (Some(count), recovered)
270        } else {
271            (None, Vec::new())
272        };
273
274    PageAnalysis {
275        page_number: page_num,
276        status,
277        page_type: header.page_type,
278        checksum_valid: csum_result.valid,
279        lsn_valid,
280        lsn: header.lsn,
281        corruption_pattern,
282        record_count,
283        records,
284    }
285}
286
287/// Encode bytes as a lowercase hex string.
288fn to_hex(data: &[u8]) -> String {
289    let mut s = String::with_capacity(data.len() * 2);
290    for &b in data {
291        use std::fmt::Write;
292        let _ = write!(s, "{:02x}", b);
293    }
294    s
295}
296
297/// Extract raw record bytes as hex from an INDEX page.
298fn extract_records(
299    page_data: &[u8],
300    recs: &[crate::innodb::record::RecordInfo],
301    page_size: u32,
302) -> Vec<RecoveredRecord> {
303    let ps = page_size as usize;
304    let data_end = ps - SIZE_FIL_TRAILER;
305
306    recs.iter()
307        .enumerate()
308        .map(|(i, rec)| {
309            let start = rec.offset;
310            let end = if i + 1 < recs.len() {
311                // Next record's origin minus its extra header
312                recs[i + 1].offset.saturating_sub(REC_N_NEW_EXTRA_BYTES)
313            } else {
314                // Use heap top or end of data area
315                data_end
316            };
317
318            let end = end.min(data_end);
319            let data = if start < end && end <= page_data.len() {
320                &page_data[start..end]
321            } else {
322                &[]
323            };
324
325            RecoveredRecord {
326                offset: rec.offset,
327                heap_no: rec.header.heap_no(),
328                delete_mark: rec.header.delete_mark(),
329                data_hex: to_hex(data),
330            }
331        })
332        .collect()
333}
334
335/// Run the recovery analysis and output results.
336pub fn execute(opts: &RecoverOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
337    let (mut ts, page_size_source) =
338        open_tablespace(&opts.file, opts.page_size, opts.mmap, writer)?;
339
340    if let Some(ref keyring_path) = opts.keyring {
341        crate::cli::setup_decryption(&mut ts, keyring_path)?;
342    }
343
344    let page_size = ts.page_size();
345    let page_count = ts.page_count();
346    let file_size = ts.file_size();
347
348    let verbose_json = opts.verbose && opts.json;
349
350    // Determine which pages to analyze
351    let (start_page, end_page) = match opts.page {
352        Some(p) => {
353            if p >= page_count {
354                return Err(IdbError::Parse(format!(
355                    "Page {} out of range (tablespace has {} pages)",
356                    p, page_count
357                )));
358            }
359            (p, p + 1)
360        }
361        None => (0, page_count),
362    };
363    let scan_count = end_page - start_page;
364
365    // Streaming mode: process one page at a time, output immediately
366    if opts.streaming && opts.page.is_none() {
367        return execute_streaming(
368            opts,
369            &mut ts,
370            page_size,
371            file_size,
372            page_size_source,
373            scan_count,
374            verbose_json,
375            writer,
376        );
377    }
378
379    // Read all pages into memory for parallel processing
380    let all_data = ts.read_all_pages()?;
381    let ps = page_size as usize;
382    let vendor_info = ts.vendor_info().clone();
383
384    let pb = if !opts.json && scan_count > 1 {
385        Some(create_progress_bar(scan_count, "pages"))
386    } else {
387        None
388    };
389
390    // Analyze pages in parallel
391    let force = opts.force;
392    let analyses: Vec<PageAnalysis> = (start_page..end_page)
393        .into_par_iter()
394        .map(|page_num| {
395            let offset = page_num as usize * ps;
396            if offset + ps > all_data.len() {
397                return PageAnalysis {
398                    page_number: page_num,
399                    status: PageStatus::Unreadable,
400                    page_type: PageType::Unknown(0),
401                    checksum_valid: false,
402                    lsn_valid: false,
403                    lsn: 0,
404                    corruption_pattern: None,
405                    record_count: None,
406                    records: Vec::new(),
407                };
408            }
409            let page_data = &all_data[offset..offset + ps];
410            analyze_page(
411                page_data,
412                page_num,
413                page_size,
414                force,
415                verbose_json,
416                Some(&vendor_info),
417            )
418        })
419        .collect();
420
421    if let Some(pb) = pb {
422        pb.set_position(scan_count);
423        pb.finish_and_clear();
424    }
425
426    // Compute summary
427    let mut intact = 0u64;
428    let mut corrupt = 0u64;
429    let mut empty = 0u64;
430    let mut unreadable = 0u64;
431    let mut total_records = 0u64;
432    let mut corrupt_records = 0u64;
433    let mut corrupt_page_numbers = Vec::new();
434    let mut index_pages_total = 0u64;
435    let mut index_pages_recoverable = 0u64;
436    let mut pattern_counts: std::collections::HashMap<String, u64> =
437        std::collections::HashMap::new();
438
439    for a in &analyses {
440        match a.status {
441            PageStatus::Intact => intact += 1,
442            PageStatus::Corrupt => {
443                corrupt += 1;
444                corrupt_page_numbers.push(a.page_number);
445                if let Some(pattern) = a.corruption_pattern {
446                    *pattern_counts
447                        .entry(pattern.name().to_string())
448                        .or_insert(0) += 1;
449                }
450            }
451            PageStatus::Empty => empty += 1,
452            PageStatus::Unreadable => unreadable += 1,
453        }
454
455        if a.page_type == PageType::Index {
456            index_pages_total += 1;
457            if a.status == PageStatus::Intact {
458                index_pages_recoverable += 1;
459            }
460            if let Some(count) = a.record_count {
461                if a.status == PageStatus::Intact {
462                    total_records += count as u64;
463                } else {
464                    corrupt_records += count as u64;
465                }
466            }
467        }
468    }
469
470    // If --force, corrupt INDEX pages with records are also recoverable
471    if opts.force {
472        for a in &analyses {
473            if a.page_type == PageType::Index
474                && a.status == PageStatus::Corrupt
475                && a.record_count.is_some()
476            {
477                index_pages_recoverable += 1;
478            }
479        }
480    }
481
482    let mut corruption_patterns: Vec<(String, u64)> = pattern_counts.into_iter().collect();
483    corruption_patterns.sort_by(|a, b| b.1.cmp(&a.1));
484
485    let stats = RecoverStats {
486        file_size,
487        page_size,
488        page_size_source,
489        scan_count,
490        intact,
491        corrupt,
492        empty,
493        unreadable,
494        total_records,
495        corrupt_records,
496        corrupt_page_numbers,
497        index_pages_total,
498        index_pages_recoverable,
499        corruption_patterns,
500    };
501
502    // Execute rebuild if requested
503    if let Some(ref rebuild_path) = opts.rebuild {
504        execute_rebuild(
505            rebuild_path,
506            &all_data,
507            &analyses,
508            page_size,
509            opts.force,
510            &vendor_info,
511            writer,
512            opts.json,
513        )?;
514    }
515
516    if opts.json {
517        output_json(opts, &analyses, &stats, writer)
518    } else {
519        output_text(opts, &analyses, &stats, writer)
520    }
521}
522
523/// Streaming mode: process pages one at a time via `for_each_page()`, writing
524/// per-page results immediately and accumulating running counters for the summary.
525/// JSON output uses NDJSON (one JSON object per line per page, plus a final summary line).
526#[allow(clippy::too_many_arguments)]
527fn execute_streaming(
528    opts: &RecoverOptions,
529    ts: &mut Tablespace,
530    page_size: u32,
531    file_size: u64,
532    page_size_source: Option<String>,
533    scan_count: u64,
534    verbose_json: bool,
535    writer: &mut dyn Write,
536) -> Result<(), IdbError> {
537    let force = opts.force;
538    let vendor_info = ts.vendor_info().clone();
539
540    // Running counters
541    let mut intact = 0u64;
542    let mut corrupt = 0u64;
543    let mut empty = 0u64;
544    let mut unreadable = 0u64;
545    let mut total_records = 0u64;
546    let mut corrupt_records = 0u64;
547    let mut corrupt_page_numbers: Vec<u64> = Vec::new();
548    let mut index_pages_total = 0u64;
549    let mut index_pages_recoverable = 0u64;
550    let mut pattern_counts: std::collections::HashMap<String, u64> =
551        std::collections::HashMap::new();
552
553    if !opts.json {
554        wprintln!(writer, "Recovery Analysis: {}", opts.file)?;
555        wprintln!(
556            writer,
557            "File size: {} bytes ({} pages x {} bytes)",
558            file_size,
559            scan_count,
560            page_size
561        )?;
562        let source_note = match &page_size_source {
563            Some(s) => format!(" ({})", s),
564            None => " (auto-detected)".to_string(),
565        };
566        wprintln!(writer, "Page size: {}{}", page_size, source_note)?;
567        wprintln!(writer)?;
568    }
569
570    ts.for_each_page(|page_num, page_data| {
571        let a = analyze_page(
572            page_data,
573            page_num,
574            page_size,
575            force,
576            verbose_json,
577            Some(&vendor_info),
578        );
579
580        // Update running counters
581        match a.status {
582            PageStatus::Intact => intact += 1,
583            PageStatus::Corrupt => {
584                corrupt += 1;
585                corrupt_page_numbers.push(a.page_number);
586                if let Some(pattern) = a.corruption_pattern {
587                    *pattern_counts
588                        .entry(pattern.name().to_string())
589                        .or_insert(0) += 1;
590                }
591            }
592            PageStatus::Empty => empty += 1,
593            PageStatus::Unreadable => unreadable += 1,
594        }
595
596        if a.page_type == PageType::Index {
597            index_pages_total += 1;
598            if a.status == PageStatus::Intact {
599                index_pages_recoverable += 1;
600            }
601            if force && a.status == PageStatus::Corrupt && a.record_count.is_some() {
602                index_pages_recoverable += 1;
603            }
604            if let Some(count) = a.record_count {
605                if a.status == PageStatus::Intact {
606                    total_records += count as u64;
607                } else {
608                    corrupt_records += count as u64;
609                }
610            }
611        }
612
613        if opts.json {
614            // NDJSON: emit per-page info (always in streaming, or only verbose)
615            if opts.verbose {
616                let info = PageRecoveryInfo {
617                    page_number: a.page_number,
618                    status: a.status,
619                    page_type: a.page_type.name().to_string(),
620                    checksum_valid: a.checksum_valid,
621                    lsn_valid: a.lsn_valid,
622                    lsn: a.lsn,
623                    corruption_pattern: a.corruption_pattern.map(|p| p.name().to_string()),
624                    record_count: a.record_count,
625                    records: a.records,
626                };
627                let line = serde_json::to_string(&info)
628                    .map_err(|e| IdbError::Parse(format!("JSON error: {}", e)))?;
629                wprintln!(writer, "{}", line)?;
630            }
631        } else if opts.verbose {
632            // Text: per-page detail
633            let status_str = match a.status {
634                PageStatus::Intact => a.status.label().to_string(),
635                PageStatus::Corrupt => format!("{}", a.status.label().red()),
636                PageStatus::Empty => a.status.label().to_string(),
637                PageStatus::Unreadable => format!("{}", a.status.label().red()),
638            };
639
640            let mut line = format!(
641                "Page {:>4}: {:<14} {:<12} LSN={}",
642                a.page_number,
643                a.page_type.name(),
644                status_str,
645                a.lsn,
646            );
647
648            if let Some(count) = a.record_count {
649                line.push_str(&format!("  records={}", count));
650            }
651
652            if a.status == PageStatus::Corrupt {
653                if !a.checksum_valid {
654                    line.push_str("  checksum mismatch");
655                }
656                if !a.lsn_valid {
657                    line.push_str("  LSN mismatch");
658                }
659                if let Some(pattern) = a.corruption_pattern {
660                    line.push_str(&format!("  [{}]", pattern.name()));
661                }
662            }
663
664            wprintln!(writer, "{}", line)?;
665        }
666
667        Ok(())
668    })?;
669
670    // Output summary
671    let mut corruption_patterns: Vec<(String, u64)> = pattern_counts.into_iter().collect();
672    corruption_patterns.sort_by(|a, b| b.1.cmp(&a.1));
673
674    let stats = RecoverStats {
675        file_size,
676        page_size,
677        page_size_source,
678        scan_count,
679        intact,
680        corrupt,
681        empty,
682        unreadable,
683        total_records,
684        corrupt_records,
685        corrupt_page_numbers,
686        index_pages_total,
687        index_pages_recoverable,
688        corruption_patterns,
689    };
690
691    if opts.json {
692        // Emit a final summary line as NDJSON
693        let all_records = stats.total_records + if opts.force { stats.corrupt_records } else { 0 };
694        let force_recs = if stats.corrupt_records > 0 && !opts.force {
695            Some(stats.corrupt_records)
696        } else {
697            None
698        };
699
700        let summary = serde_json::json!({
701            "type": "summary",
702            "file": opts.file,
703            "file_size": stats.file_size,
704            "page_size": stats.page_size,
705            "page_size_source": stats.page_size_source,
706            "total_pages": stats.scan_count,
707            "summary": {
708                "intact": stats.intact,
709                "corrupt": stats.corrupt,
710                "empty": stats.empty,
711                "unreadable": stats.unreadable,
712            },
713            "recoverable_records": all_records,
714            "force_recoverable_records": force_recs,
715        });
716        let line = serde_json::to_string(&summary)
717            .map_err(|e| IdbError::Parse(format!("JSON error: {}", e)))?;
718        wprintln!(writer, "{}", line)?;
719    } else {
720        // Print text summary (verbose per-page was already output above)
721        if opts.verbose {
722            wprintln!(writer)?;
723        }
724        output_text_summary(opts, &stats, writer)?;
725    }
726
727    Ok(())
728}
729
730/// Rebuild a new tablespace from recoverable pages.
731///
732/// Collects intact pages (and corrupt pages if `--force`), renumbers them
733/// sequentially, builds a new page 0, recalculates all checksums, and writes
734/// to the output path. Returns (pages_written, pages_skipped).
735#[allow(clippy::too_many_arguments)]
736fn execute_rebuild(
737    output_path: &str,
738    all_data: &[u8],
739    analyses: &[PageAnalysis],
740    page_size: u32,
741    force: bool,
742    vendor_info: &crate::innodb::vendor::VendorInfo,
743    writer: &mut dyn Write,
744    json: bool,
745) -> Result<(u64, u64), IdbError> {
746    let ps = page_size as usize;
747    let mut collected_pages: Vec<Vec<u8>> = Vec::new();
748    let mut skipped = 0u64;
749
750    // Infer space_id and flags from page 0 if intact, else from first intact page
751    let mut space_id = 0u32;
752    let mut flags = 0u32;
753    let mut max_lsn = 0u64;
754    let mut found_metadata = false;
755
756    for a in analyses {
757        if a.status == PageStatus::Intact || (force && a.status == PageStatus::Corrupt) {
758            if !found_metadata {
759                let offset = a.page_number as usize * ps;
760                if offset + ps <= all_data.len() {
761                    let page_data = &all_data[offset..offset + ps];
762                    space_id = BigEndian::read_u32(&page_data[FIL_PAGE_SPACE_ID..]);
763                    // Read flags from page 0 if this is page 0
764                    if a.page_number == 0 {
765                        let fsp = FIL_PAGE_DATA;
766                        if page_data.len() > fsp + FSP_SPACE_FLAGS + 4 {
767                            flags = BigEndian::read_u32(&page_data[fsp + FSP_SPACE_FLAGS..]);
768                        }
769                    }
770                    found_metadata = true;
771                }
772            }
773            if a.lsn > max_lsn {
774                max_lsn = a.lsn;
775            }
776        }
777    }
778
779    // Collect recoverable pages (skip page 0 — we'll build a new one)
780    for a in analyses {
781        let include = match a.status {
782            PageStatus::Intact => true,
783            PageStatus::Corrupt if force => true,
784            _ => false,
785        };
786
787        if !include || a.page_number == 0 {
788            if a.status != PageStatus::Empty && a.page_number != 0 {
789                skipped += 1;
790            }
791            continue;
792        }
793
794        let offset = a.page_number as usize * ps;
795        if offset + ps > all_data.len() {
796            skipped += 1;
797            continue;
798        }
799
800        collected_pages.push(all_data[offset..offset + ps].to_vec());
801    }
802
803    // Detect algorithm
804    let algorithm = write::detect_algorithm(
805        if !all_data.is_empty() && all_data.len() >= ps {
806            &all_data[..ps]
807        } else {
808            &[]
809        },
810        page_size,
811        Some(vendor_info),
812    );
813    // Use CRC-32C as fallback if detection returned None
814    let algorithm = if algorithm == ChecksumAlgorithm::None {
815        ChecksumAlgorithm::Crc32c
816    } else {
817        algorithm
818    };
819
820    let total_pages = (collected_pages.len() + 1) as u32; // +1 for new page 0
821
822    // Build new page 0
823    let page0 = write::build_fsp_page(space_id, total_pages, flags, max_lsn, page_size, algorithm);
824
825    // Renumber collected pages and fix checksums
826    let mut output_pages: Vec<Vec<u8>> = Vec::with_capacity(total_pages as usize);
827    output_pages.push(page0);
828
829    for (i, mut page) in collected_pages.into_iter().enumerate() {
830        let new_page_num = (i + 1) as u32;
831        // Update FIL_PAGE_OFFSET (page number)
832        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], new_page_num);
833        // Recalculate checksum
834        write::fix_page_checksum(&mut page, page_size, algorithm);
835        output_pages.push(page);
836    }
837
838    // Write output
839    write::write_tablespace(output_path, &output_pages)?;
840
841    // Post-validate
842    let ts = Tablespace::open(output_path)?;
843    let output_count = ts.page_count();
844    let mut valid_count = 0u64;
845    for i in 0..output_count {
846        let page = write::read_page_raw(output_path, i, page_size)?;
847        if validate_checksum(&page, page_size, Some(vendor_info)).valid {
848            valid_count += 1;
849        }
850    }
851
852    let pages_written = output_pages.len() as u64;
853
854    if !json {
855        wprintln!(writer)?;
856        wprintln!(writer, "Rebuild Output: {}", output_path)?;
857        wprintln!(writer, "  Pages written:    {}", pages_written)?;
858        wprintln!(writer, "  Pages skipped:    {}", skipped)?;
859        wprintln!(
860            writer,
861            "  Post-validation:  {}/{} valid checksums",
862            valid_count,
863            output_count
864        )?;
865        if valid_count < output_count {
866            wprintln!(writer, "  Warning: some pages still have invalid checksums")?;
867        }
868    }
869
870    Ok((pages_written, skipped))
871}
872
873fn output_text(
874    opts: &RecoverOptions,
875    analyses: &[PageAnalysis],
876    stats: &RecoverStats,
877    writer: &mut dyn Write,
878) -> Result<(), IdbError> {
879    wprintln!(writer, "Recovery Analysis: {}", opts.file)?;
880    wprintln!(
881        writer,
882        "File size: {} bytes ({} pages x {} bytes)",
883        stats.file_size,
884        stats.scan_count,
885        stats.page_size
886    )?;
887
888    let source_note = match &stats.page_size_source {
889        Some(s) => format!(" ({})", s),
890        None => " (auto-detected)".to_string(),
891    };
892    wprintln!(writer, "Page size: {}{}", stats.page_size, source_note)?;
893    wprintln!(writer)?;
894
895    // Verbose: per-page detail
896    if opts.verbose {
897        for a in analyses {
898            let status_str = match a.status {
899                PageStatus::Intact => a.status.label().to_string(),
900                PageStatus::Corrupt => format!("{}", a.status.label().red()),
901                PageStatus::Empty => a.status.label().to_string(),
902                PageStatus::Unreadable => format!("{}", a.status.label().red()),
903            };
904
905            let mut line = format!(
906                "Page {:>4}: {:<14} {:<12} LSN={}",
907                a.page_number,
908                a.page_type.name(),
909                status_str,
910                a.lsn,
911            );
912
913            if let Some(count) = a.record_count {
914                line.push_str(&format!("  records={}", count));
915            }
916
917            if a.status == PageStatus::Corrupt {
918                if !a.checksum_valid {
919                    line.push_str("  checksum mismatch");
920                }
921                if !a.lsn_valid {
922                    line.push_str("  LSN mismatch");
923                }
924                if let Some(pattern) = a.corruption_pattern {
925                    line.push_str(&format!("  [{}]", pattern.name()));
926                }
927            }
928
929            wprintln!(writer, "{}", line)?;
930        }
931        wprintln!(writer)?;
932    }
933
934    output_text_summary(opts, stats, writer)
935}
936
937/// Print the text-mode recovery summary (shared by streaming and non-streaming paths).
938fn output_text_summary(
939    opts: &RecoverOptions,
940    stats: &RecoverStats,
941    writer: &mut dyn Write,
942) -> Result<(), IdbError> {
943    wprintln!(writer, "Page Status Summary:")?;
944    wprintln!(writer, "  Intact:      {:>4} pages", stats.intact)?;
945    if stats.corrupt > 0 {
946        let pages_str = if stats.corrupt_page_numbers.len() <= 10 {
947            let nums: Vec<String> = stats
948                .corrupt_page_numbers
949                .iter()
950                .map(|n| n.to_string())
951                .collect();
952            format!(" (pages {})", nums.join(", "))
953        } else {
954            format!(" ({} pages)", stats.corrupt)
955        };
956        wprintln!(
957            writer,
958            "  Corrupt:     {:>4} pages{}",
959            format!("{}", stats.corrupt).red(),
960            pages_str
961        )?;
962    } else {
963        wprintln!(writer, "  Corrupt:     {:>4} pages", stats.corrupt)?;
964    }
965    wprintln!(writer, "  Empty:       {:>4} pages", stats.empty)?;
966    if stats.unreadable > 0 {
967        wprintln!(
968            writer,
969            "  Unreadable:  {:>4} pages",
970            format!("{}", stats.unreadable).red()
971        )?;
972    } else {
973        wprintln!(writer, "  Unreadable:  {:>4} pages", stats.unreadable)?;
974    }
975    wprintln!(writer, "  Total:       {:>4} pages", stats.scan_count)?;
976    wprintln!(writer)?;
977
978    if !stats.corruption_patterns.is_empty() {
979        wprintln!(writer, "Corruption Patterns:")?;
980        for (name, count) in &stats.corruption_patterns {
981            let label = if *count == 1 { "page" } else { "pages" };
982            wprintln!(writer, "  {}: {} {}", name, count, label)?;
983        }
984        wprintln!(writer)?;
985    }
986
987    if stats.index_pages_total > 0 {
988        wprintln!(
989            writer,
990            "Recoverable INDEX Pages: {} of {}",
991            stats.index_pages_recoverable,
992            stats.index_pages_total
993        )?;
994        wprintln!(writer, "  Total user records: {}", stats.total_records)?;
995        if stats.corrupt_records > 0 && !opts.force {
996            wprintln!(
997                writer,
998                "  Records on corrupt pages: {} (use --force to include)",
999                stats.corrupt_records
1000            )?;
1001        } else if stats.corrupt_records > 0 {
1002            wprintln!(
1003                writer,
1004                "  Records on corrupt pages: {} (included with --force)",
1005                stats.corrupt_records
1006            )?;
1007        }
1008        wprintln!(writer)?;
1009    }
1010
1011    let total_non_empty = stats.intact + stats.corrupt + stats.unreadable;
1012    if total_non_empty > 0 {
1013        let pct = (stats.intact as f64 / total_non_empty as f64) * 100.0;
1014        wprintln!(writer, "Overall: {:.1}% of pages intact", pct)?;
1015    }
1016
1017    Ok(())
1018}
1019
1020fn output_json(
1021    opts: &RecoverOptions,
1022    analyses: &[PageAnalysis],
1023    stats: &RecoverStats,
1024    writer: &mut dyn Write,
1025) -> Result<(), IdbError> {
1026    let all_records = stats.total_records + if opts.force { stats.corrupt_records } else { 0 };
1027
1028    let pages: Vec<PageRecoveryInfo> = if opts.verbose {
1029        analyses
1030            .iter()
1031            .map(|a| PageRecoveryInfo {
1032                page_number: a.page_number,
1033                status: a.status,
1034                page_type: a.page_type.name().to_string(),
1035                checksum_valid: a.checksum_valid,
1036                lsn_valid: a.lsn_valid,
1037                lsn: a.lsn,
1038                corruption_pattern: a.corruption_pattern.map(|p| p.name().to_string()),
1039                record_count: a.record_count,
1040                records: a
1041                    .records
1042                    .iter()
1043                    .map(|r| RecoveredRecord {
1044                        offset: r.offset,
1045                        heap_no: r.heap_no,
1046                        delete_mark: r.delete_mark,
1047                        data_hex: r.data_hex.clone(),
1048                    })
1049                    .collect(),
1050            })
1051            .collect()
1052    } else {
1053        Vec::new()
1054    };
1055
1056    let force_recs = if stats.corrupt_records > 0 && !opts.force {
1057        Some(stats.corrupt_records)
1058    } else {
1059        None
1060    };
1061
1062    let report = RecoverReport {
1063        file: opts.file.clone(),
1064        file_size: stats.file_size,
1065        page_size: stats.page_size,
1066        page_size_source: stats.page_size_source.clone(),
1067        total_pages: stats.scan_count,
1068        summary: RecoverSummary {
1069            intact: stats.intact,
1070            corrupt: stats.corrupt,
1071            empty: stats.empty,
1072            unreadable: stats.unreadable,
1073        },
1074        recoverable_records: all_records,
1075        force_recoverable_records: force_recs,
1076        pages,
1077    };
1078
1079    let json = serde_json::to_string_pretty(&report)
1080        .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
1081    wprintln!(writer, "{}", json)?;
1082
1083    Ok(())
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089
1090    #[test]
1091    fn test_page_status_label() {
1092        assert_eq!(PageStatus::Intact.label(), "intact");
1093        assert_eq!(PageStatus::Corrupt.label(), "CORRUPT");
1094        assert_eq!(PageStatus::Empty.label(), "empty");
1095        assert_eq!(PageStatus::Unreadable.label(), "UNREADABLE");
1096    }
1097
1098    #[test]
1099    fn test_analyze_empty_page() {
1100        let page = vec![0u8; 16384];
1101        let result = analyze_page(&page, 0, 16384, false, false, None);
1102        assert_eq!(result.status, PageStatus::Empty);
1103        assert_eq!(result.page_type, PageType::Allocated);
1104    }
1105
1106    #[test]
1107    fn test_analyze_short_page_is_unreadable() {
1108        let page = vec![0xFF; 10];
1109        let result = analyze_page(&page, 0, 16384, false, false, None);
1110        assert_eq!(result.status, PageStatus::Unreadable);
1111    }
1112
1113    #[test]
1114    fn test_analyze_valid_index_page() {
1115        use byteorder::{BigEndian, ByteOrder};
1116
1117        let mut page = vec![0u8; 16384];
1118        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 1);
1119        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
1120        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
1121        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 5000);
1122        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); // INDEX
1123        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], 1);
1124
1125        // Trailer
1126        let trailer = 16384 - SIZE_FIL_TRAILER;
1127        BigEndian::write_u32(&mut page[trailer + 4..], (5000u64 & 0xFFFFFFFF) as u32);
1128
1129        // CRC-32C checksum
1130        let end = 16384 - SIZE_FIL_TRAILER;
1131        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
1132        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
1133        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
1134
1135        let result = analyze_page(&page, 1, 16384, false, false, None);
1136        assert_eq!(result.status, PageStatus::Intact);
1137        assert_eq!(result.page_type, PageType::Index);
1138        assert!(result.record_count.is_some());
1139    }
1140
1141    #[test]
1142    fn test_analyze_corrupt_page() {
1143        use byteorder::{BigEndian, ByteOrder};
1144
1145        let mut page = vec![0u8; 16384];
1146        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 1);
1147        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 5000);
1148        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855);
1149        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], 1);
1150        // Bad checksum — leave it as 0 while page has data
1151        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], 0xDEAD);
1152
1153        let result = analyze_page(&page, 1, 16384, false, false, None);
1154        assert_eq!(result.status, PageStatus::Corrupt);
1155        // Without --force, no record count on corrupt pages
1156        assert!(result.record_count.is_none());
1157    }
1158
1159    #[test]
1160    fn test_analyze_corrupt_page_with_force() {
1161        use byteorder::{BigEndian, ByteOrder};
1162
1163        let mut page = vec![0u8; 16384];
1164        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 1);
1165        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 5000);
1166        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855);
1167        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], 1);
1168        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], 0xDEAD);
1169
1170        let result = analyze_page(&page, 1, 16384, true, false, None);
1171        assert_eq!(result.status, PageStatus::Corrupt);
1172        // With --force, records are counted even on corrupt pages
1173        assert!(result.record_count.is_some());
1174    }
1175}