Skip to main content

idb/cli/
checksum.rs

1use std::io::Write;
2
3use colored::Colorize;
4use rayon::prelude::*;
5use serde::Serialize;
6
7use crate::cli::{create_progress_bar, wprintln};
8use crate::innodb::checksum::{validate_checksum, validate_lsn, ChecksumAlgorithm, ChecksumResult};
9use crate::innodb::page::FilHeader;
10use crate::IdbError;
11
12/// Options for the `inno checksum` subcommand.
13pub struct ChecksumOptions {
14    /// Path to the InnoDB tablespace file (.ibd).
15    pub file: String,
16    /// Show per-page checksum details.
17    pub verbose: bool,
18    /// Emit output as JSON.
19    pub json: bool,
20    /// Output as CSV.
21    pub csv: bool,
22    /// Override the auto-detected page size.
23    pub page_size: Option<u32>,
24    /// Path to MySQL keyring file for decrypting encrypted tablespaces.
25    pub keyring: Option<String>,
26    /// Number of threads for parallel processing (0 = auto-detect).
27    pub threads: usize,
28    /// Use memory-mapped I/O for file access.
29    pub mmap: bool,
30    /// Stream results incrementally for lower memory usage.
31    pub streaming: bool,
32}
33
34#[derive(Serialize)]
35struct ChecksumSummaryJson {
36    file: String,
37    page_size: u32,
38    total_pages: u64,
39    empty_pages: u64,
40    valid_pages: u64,
41    invalid_pages: u64,
42    lsn_mismatches: u64,
43    #[serde(skip_serializing_if = "Vec::is_empty")]
44    pages: Vec<PageChecksumJson>,
45}
46
47#[derive(Serialize)]
48struct PageChecksumJson {
49    page_number: u64,
50    status: String,
51    algorithm: String,
52    stored_checksum: u32,
53    calculated_checksum: u32,
54    lsn_valid: bool,
55}
56
57/// Result of validating a single page's checksum, used for parallel processing.
58enum PageResult {
59    /// Page header could not be parsed.
60    ParseError,
61    /// Page is all zeros (empty/allocated).
62    Empty,
63    /// Page was validated successfully.
64    Validated {
65        csum_result: ChecksumResult,
66        lsn_valid: bool,
67    },
68}
69
70/// Validate a single page's checksum and LSN. Pure function safe for parallel execution.
71fn validate_page(
72    page_data: &[u8],
73    page_size: u32,
74    vendor_info: &crate::innodb::vendor::VendorInfo,
75) -> PageResult {
76    let header = match FilHeader::parse(page_data) {
77        Some(h) => h,
78        None => return PageResult::ParseError,
79    };
80
81    if header.checksum == 0 && page_data.iter().all(|&b| b == 0) {
82        return PageResult::Empty;
83    }
84
85    let csum_result = validate_checksum(page_data, page_size, Some(vendor_info));
86    let lsn_valid = validate_lsn(page_data, page_size);
87
88    PageResult::Validated {
89        csum_result,
90        lsn_valid,
91    }
92}
93
94/// Validate page checksums for every page in an InnoDB tablespace.
95///
96/// Iterates over all pages and validates the stored checksum (bytes 0-3 of the
97/// FIL header) against two algorithms: **CRC-32C** (MySQL 5.7.7+), which XORs
98/// two independent CRC-32C values computed over bytes \[4..26) and
99/// \[38..page_size-8); and **legacy InnoDB**, which uses `ut_fold_ulint_pair`
100/// with u32 wrapping arithmetic over the same two byte ranges. A page is
101/// considered valid if either algorithm matches the stored value.
102///
103/// Additionally checks **LSN consistency**: the low 32 bits of the header LSN
104/// (bytes 16-23) must match the LSN value in the 8-byte FIL trailer at the
105/// end of the page. All-zero pages are counted as empty and skipped entirely.
106///
107/// When the tablespace has more than one page, all page data is read into memory
108/// and checksums are validated in parallel using rayon. Results are collected in
109/// page order for deterministic output.
110///
111/// Prints a summary with total, empty, valid, and invalid page counts. In
112/// `--verbose` mode, every non-empty page is printed with its algorithm,
113/// stored and calculated checksum values, and LSN status. The process exits
114/// with code 1 if any page has an invalid checksum, making this suitable for
115/// scripted integrity checks.
116///
117/// **Note**: When `--streaming` is combined with `--json`, the output uses
118/// NDJSON (one JSON object per line) rather than a single JSON document.
119pub fn execute(opts: &ChecksumOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
120    let mut ts = crate::cli::open_tablespace(&opts.file, opts.page_size, opts.mmap)?;
121
122    if let Some(ref keyring_path) = opts.keyring {
123        crate::cli::setup_decryption(&mut ts, keyring_path)?;
124    }
125
126    let page_size = ts.page_size();
127    let page_count = ts.page_count();
128    let vendor_info = ts.vendor_info().clone();
129
130    // Streaming mode: process one page at a time, output immediately
131    if opts.streaming {
132        return execute_streaming(opts, &mut ts, page_size, page_count, &vendor_info, writer);
133    }
134
135    // Read all pages into memory for parallel processing
136    let all_data = ts.read_all_pages()?;
137    let ps = page_size as usize;
138
139    if opts.json {
140        return execute_json_parallel(
141            opts,
142            &all_data,
143            ps,
144            page_size,
145            page_count,
146            &vendor_info,
147            writer,
148        );
149    }
150
151    if opts.csv {
152        return execute_csv_parallel(&all_data, ps, page_size, page_count, &vendor_info, writer);
153    }
154
155    wprintln!(
156        writer,
157        "Validating checksums for {} ({} pages, page size {})...",
158        opts.file,
159        page_count,
160        page_size
161    )?;
162    wprintln!(writer)?;
163
164    // Create progress bar before parallel work so it tracks real progress
165    let pb = create_progress_bar(page_count, "pages");
166
167    // Process all pages in parallel
168    let results: Vec<(u64, PageResult)> = (0..page_count)
169        .into_par_iter()
170        .map(|page_num| {
171            let offset = page_num as usize * ps;
172            if offset + ps > all_data.len() {
173                pb.inc(1);
174                return (page_num, PageResult::ParseError);
175            }
176            let page_data = &all_data[offset..offset + ps];
177            let result = validate_page(page_data, page_size, &vendor_info);
178            pb.inc(1);
179            (page_num, result)
180        })
181        .collect();
182
183    pb.finish_and_clear();
184
185    // Output results sequentially in page order (rayon collect preserves order)
186    let mut valid_count = 0u64;
187    let mut invalid_count = 0u64;
188    let mut empty_count = 0u64;
189    let mut lsn_mismatch_count = 0u64;
190
191    for (page_num, result) in &results {
192        match result {
193            PageResult::ParseError => {
194                eprintln!("Page {}: Could not parse FIL header", page_num);
195                invalid_count += 1;
196            }
197            PageResult::Empty => {
198                empty_count += 1;
199                if opts.verbose {
200                    wprintln!(writer, "Page {}: EMPTY", page_num)?;
201                }
202            }
203            PageResult::Validated {
204                csum_result,
205                lsn_valid,
206            } => {
207                if csum_result.valid {
208                    valid_count += 1;
209                    if opts.verbose {
210                        wprintln!(
211                            writer,
212                            "Page {}: {} ({:?}, stored={}, calculated={})",
213                            page_num,
214                            "OK".green(),
215                            csum_result.algorithm,
216                            csum_result.stored_checksum,
217                            csum_result.calculated_checksum,
218                        )?;
219                    }
220                } else {
221                    invalid_count += 1;
222                    wprintln!(
223                        writer,
224                        "Page {}: {} checksum (stored={}, calculated={}, algorithm={:?})",
225                        page_num,
226                        "INVALID".red(),
227                        csum_result.stored_checksum,
228                        csum_result.calculated_checksum,
229                        csum_result.algorithm,
230                    )?;
231                }
232
233                if !lsn_valid {
234                    lsn_mismatch_count += 1;
235                    if csum_result.valid {
236                        wprintln!(
237                            writer,
238                            "Page {}: {} - header LSN low32 does not match trailer",
239                            page_num,
240                            "LSN MISMATCH".yellow(),
241                        )?;
242                    }
243                }
244            }
245        }
246    }
247
248    wprintln!(writer)?;
249    wprintln!(writer, "Summary:")?;
250    wprintln!(writer, "  Total pages: {}", page_count)?;
251    wprintln!(writer, "  Empty pages: {}", empty_count)?;
252    wprintln!(writer, "  Valid checksums: {}", valid_count)?;
253    if invalid_count > 0 {
254        wprintln!(
255            writer,
256            "  Invalid checksums: {}",
257            format!("{}", invalid_count).red()
258        )?;
259    } else {
260        wprintln!(
261            writer,
262            "  Invalid checksums: {}",
263            format!("{}", invalid_count).green()
264        )?;
265    }
266    if lsn_mismatch_count > 0 {
267        wprintln!(
268            writer,
269            "  LSN mismatches: {}",
270            format!("{}", lsn_mismatch_count).yellow()
271        )?;
272    }
273
274    if invalid_count > 0 {
275        return Err(IdbError::Parse(format!(
276            "{} pages with invalid checksums",
277            invalid_count
278        )));
279    }
280
281    Ok(())
282}
283
284/// Return a short string name for a checksum algorithm.
285fn algorithm_name(algo: ChecksumAlgorithm) -> &'static str {
286    match algo {
287        ChecksumAlgorithm::Crc32c => "crc32c",
288        ChecksumAlgorithm::InnoDB => "innodb",
289        ChecksumAlgorithm::MariaDbFullCrc32 => "mariadb_full_crc32",
290        ChecksumAlgorithm::None => "none",
291    }
292}
293
294/// Streaming mode: process pages one at a time via `for_each_page()`, writing
295/// each result immediately. No progress bar, no bulk memory allocation.
296/// JSON output uses NDJSON (one JSON object per line).
297fn execute_streaming(
298    opts: &ChecksumOptions,
299    ts: &mut crate::innodb::tablespace::Tablespace,
300    page_size: u32,
301    page_count: u64,
302    vendor_info: &crate::innodb::vendor::VendorInfo,
303    writer: &mut dyn Write,
304) -> Result<(), IdbError> {
305    let mut valid_count = 0u64;
306    let mut invalid_count = 0u64;
307    let mut empty_count = 0u64;
308    let mut lsn_mismatch_count = 0u64;
309
310    if !opts.json {
311        wprintln!(
312            writer,
313            "Validating checksums for {} ({} pages, page size {})...",
314            opts.file,
315            page_count,
316            page_size
317        )?;
318        wprintln!(writer)?;
319    }
320
321    ts.for_each_page(|page_num, page_data| {
322        let result = validate_page(page_data, page_size, vendor_info);
323
324        match &result {
325            PageResult::ParseError => {
326                invalid_count += 1;
327                if opts.json {
328                    let obj = PageChecksumJson {
329                        page_number: page_num,
330                        status: "error".to_string(),
331                        algorithm: "unknown".to_string(),
332                        stored_checksum: 0,
333                        calculated_checksum: 0,
334                        lsn_valid: false,
335                    };
336                    let line = serde_json::to_string(&obj)
337                        .map_err(|e| IdbError::Parse(format!("JSON error: {}", e)))?;
338                    wprintln!(writer, "{}", line)?;
339                } else {
340                    eprintln!("Page {}: Could not parse FIL header", page_num);
341                }
342            }
343            PageResult::Empty => {
344                empty_count += 1;
345                // In streaming JSON mode, skip empty pages (same as non-streaming)
346                if !opts.json && opts.verbose {
347                    wprintln!(writer, "Page {}: EMPTY", page_num)?;
348                }
349            }
350            PageResult::Validated {
351                csum_result,
352                lsn_valid,
353            } => {
354                if csum_result.valid {
355                    valid_count += 1;
356                } else {
357                    invalid_count += 1;
358                }
359                if !lsn_valid {
360                    lsn_mismatch_count += 1;
361                }
362
363                if opts.json {
364                    if opts.verbose || !csum_result.valid || !lsn_valid {
365                        let obj = PageChecksumJson {
366                            page_number: page_num,
367                            status: if csum_result.valid {
368                                "valid".to_string()
369                            } else {
370                                "invalid".to_string()
371                            },
372                            algorithm: algorithm_name(csum_result.algorithm).to_string(),
373                            stored_checksum: csum_result.stored_checksum,
374                            calculated_checksum: csum_result.calculated_checksum,
375                            lsn_valid: *lsn_valid,
376                        };
377                        let line = serde_json::to_string(&obj)
378                            .map_err(|e| IdbError::Parse(format!("JSON error: {}", e)))?;
379                        wprintln!(writer, "{}", line)?;
380                    }
381                } else {
382                    if csum_result.valid {
383                        if opts.verbose {
384                            wprintln!(
385                                writer,
386                                "Page {}: {} ({:?}, stored={}, calculated={})",
387                                page_num,
388                                "OK".green(),
389                                csum_result.algorithm,
390                                csum_result.stored_checksum,
391                                csum_result.calculated_checksum,
392                            )?;
393                        }
394                    } else {
395                        wprintln!(
396                            writer,
397                            "Page {}: {} checksum (stored={}, calculated={}, algorithm={:?})",
398                            page_num,
399                            "INVALID".red(),
400                            csum_result.stored_checksum,
401                            csum_result.calculated_checksum,
402                            csum_result.algorithm,
403                        )?;
404                    }
405
406                    if !lsn_valid && csum_result.valid {
407                        wprintln!(
408                            writer,
409                            "Page {}: {} - header LSN low32 does not match trailer",
410                            page_num,
411                            "LSN MISMATCH".yellow(),
412                        )?;
413                    }
414                }
415            }
416        }
417        Ok(())
418    })?;
419
420    if !opts.json {
421        wprintln!(writer)?;
422        wprintln!(writer, "Summary:")?;
423        wprintln!(writer, "  Total pages: {}", page_count)?;
424        wprintln!(writer, "  Empty pages: {}", empty_count)?;
425        wprintln!(writer, "  Valid checksums: {}", valid_count)?;
426        if invalid_count > 0 {
427            wprintln!(
428                writer,
429                "  Invalid checksums: {}",
430                format!("{}", invalid_count).red()
431            )?;
432        } else {
433            wprintln!(
434                writer,
435                "  Invalid checksums: {}",
436                format!("{}", invalid_count).green()
437            )?;
438        }
439        if lsn_mismatch_count > 0 {
440            wprintln!(
441                writer,
442                "  LSN mismatches: {}",
443                format!("{}", lsn_mismatch_count).yellow()
444            )?;
445        }
446    }
447
448    if invalid_count > 0 {
449        return Err(IdbError::Parse(format!(
450            "{} pages with invalid checksums",
451            invalid_count
452        )));
453    }
454
455    Ok(())
456}
457
458fn execute_csv_parallel(
459    all_data: &[u8],
460    ps: usize,
461    page_size: u32,
462    page_count: u64,
463    vendor_info: &crate::innodb::vendor::VendorInfo,
464    writer: &mut dyn Write,
465) -> Result<(), IdbError> {
466    use rayon::prelude::*;
467
468    wprintln!(
469        writer,
470        "page_number,status,algorithm,stored_checksum,calculated_checksum"
471    )?;
472
473    let results: Vec<(u64, PageResult)> = (0..page_count)
474        .into_par_iter()
475        .map(|page_num| {
476            let offset = page_num as usize * ps;
477            if offset + ps > all_data.len() {
478                return (page_num, PageResult::ParseError);
479            }
480            let page_data = &all_data[offset..offset + ps];
481            (page_num, validate_page(page_data, page_size, vendor_info))
482        })
483        .collect();
484
485    for (page_num, result) in results {
486        match result {
487            PageResult::Empty | PageResult::ParseError => {}
488            PageResult::Validated {
489                csum_result,
490                lsn_valid: _,
491            } => {
492                let algo = match csum_result.algorithm {
493                    ChecksumAlgorithm::Crc32c => "crc32c",
494                    ChecksumAlgorithm::InnoDB => "innodb",
495                    ChecksumAlgorithm::MariaDbFullCrc32 => "mariadb_full_crc32",
496                    ChecksumAlgorithm::None => "none",
497                };
498                let status = if csum_result.valid {
499                    "valid"
500                } else {
501                    "invalid"
502                };
503                wprintln!(
504                    writer,
505                    "{},{},{},{},{}",
506                    page_num,
507                    status,
508                    algo,
509                    csum_result.stored_checksum,
510                    csum_result.calculated_checksum
511                )?;
512            }
513        }
514    }
515    Ok(())
516}
517
518fn execute_json_parallel(
519    opts: &ChecksumOptions,
520    all_data: &[u8],
521    ps: usize,
522    page_size: u32,
523    page_count: u64,
524    vendor_info: &crate::innodb::vendor::VendorInfo,
525    writer: &mut dyn Write,
526) -> Result<(), IdbError> {
527    // Process all pages in parallel
528    let results: Vec<(u64, PageResult)> = (0..page_count)
529        .into_par_iter()
530        .map(|page_num| {
531            let offset = page_num as usize * ps;
532            if offset + ps > all_data.len() {
533                return (page_num, PageResult::ParseError);
534            }
535            let page_data = &all_data[offset..offset + ps];
536            (page_num, validate_page(page_data, page_size, vendor_info))
537        })
538        .collect();
539
540    let mut valid_count = 0u64;
541    let mut invalid_count = 0u64;
542    let mut empty_count = 0u64;
543    let mut lsn_mismatch_count = 0u64;
544    let mut pages = Vec::new();
545
546    for (page_num, result) in &results {
547        match result {
548            PageResult::ParseError => {
549                invalid_count += 1;
550                if opts.verbose {
551                    pages.push(PageChecksumJson {
552                        page_number: *page_num,
553                        status: "error".to_string(),
554                        algorithm: "unknown".to_string(),
555                        stored_checksum: 0,
556                        calculated_checksum: 0,
557                        lsn_valid: false,
558                    });
559                }
560            }
561            PageResult::Empty => {
562                empty_count += 1;
563            }
564            PageResult::Validated {
565                csum_result,
566                lsn_valid,
567            } => {
568                if csum_result.valid {
569                    valid_count += 1;
570                } else {
571                    invalid_count += 1;
572                }
573                if !lsn_valid {
574                    lsn_mismatch_count += 1;
575                }
576
577                if opts.verbose || !csum_result.valid || !lsn_valid {
578                    pages.push(PageChecksumJson {
579                        page_number: *page_num,
580                        status: if csum_result.valid {
581                            "valid".to_string()
582                        } else {
583                            "invalid".to_string()
584                        },
585                        algorithm: algorithm_name(csum_result.algorithm).to_string(),
586                        stored_checksum: csum_result.stored_checksum,
587                        calculated_checksum: csum_result.calculated_checksum,
588                        lsn_valid: *lsn_valid,
589                    });
590                }
591            }
592        }
593    }
594
595    let summary = ChecksumSummaryJson {
596        file: opts.file.clone(),
597        page_size,
598        total_pages: page_count,
599        empty_pages: empty_count,
600        valid_pages: valid_count,
601        invalid_pages: invalid_count,
602        lsn_mismatches: lsn_mismatch_count,
603        pages,
604    };
605
606    let json = serde_json::to_string_pretty(&summary)
607        .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
608    wprintln!(writer, "{}", json)?;
609
610    if invalid_count > 0 {
611        return Err(IdbError::Parse(format!(
612            "{} pages with invalid checksums",
613            invalid_count
614        )));
615    }
616
617    Ok(())
618}