use std::io::Write;
use colored::Colorize;
use rayon::prelude::*;
use serde::Serialize;
use crate::cli::{create_progress_bar, wprintln};
use crate::innodb::checksum::{validate_checksum, validate_lsn, ChecksumAlgorithm, ChecksumResult};
use crate::innodb::page::FilHeader;
use crate::IdbError;
pub struct ChecksumOptions {
pub file: String,
pub verbose: bool,
pub json: bool,
pub csv: bool,
pub page_size: Option<u32>,
pub keyring: Option<String>,
pub threads: usize,
pub mmap: bool,
pub streaming: bool,
}
#[derive(Serialize)]
struct ChecksumSummaryJson {
file: String,
page_size: u32,
total_pages: u64,
empty_pages: u64,
valid_pages: u64,
invalid_pages: u64,
lsn_mismatches: u64,
#[serde(skip_serializing_if = "Vec::is_empty")]
pages: Vec<PageChecksumJson>,
}
#[derive(Serialize)]
struct PageChecksumJson {
page_number: u64,
status: String,
algorithm: String,
stored_checksum: u32,
calculated_checksum: u32,
lsn_valid: bool,
}
enum PageResult {
ParseError,
Empty,
Validated {
csum_result: ChecksumResult,
lsn_valid: bool,
},
}
fn validate_page(
page_data: &[u8],
page_size: u32,
vendor_info: &crate::innodb::vendor::VendorInfo,
) -> PageResult {
let header = match FilHeader::parse(page_data) {
Some(h) => h,
None => return PageResult::ParseError,
};
if header.checksum == 0 && page_data.iter().all(|&b| b == 0) {
return PageResult::Empty;
}
let csum_result = validate_checksum(page_data, page_size, Some(vendor_info));
let lsn_valid = validate_lsn(page_data, page_size);
PageResult::Validated {
csum_result,
lsn_valid,
}
}
pub fn execute(opts: &ChecksumOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
let mut ts = crate::cli::open_tablespace(&opts.file, opts.page_size, opts.mmap)?;
if let Some(ref keyring_path) = opts.keyring {
crate::cli::setup_decryption(&mut ts, keyring_path)?;
}
let page_size = ts.page_size();
let page_count = ts.page_count();
let vendor_info = ts.vendor_info().clone();
if opts.streaming {
return execute_streaming(opts, &mut ts, page_size, page_count, &vendor_info, writer);
}
let all_data = ts.read_all_pages()?;
let ps = page_size as usize;
if opts.json {
return execute_json_parallel(
opts,
&all_data,
ps,
page_size,
page_count,
&vendor_info,
writer,
);
}
if opts.csv {
return execute_csv_parallel(&all_data, ps, page_size, page_count, &vendor_info, writer);
}
wprintln!(
writer,
"Validating checksums for {} ({} pages, page size {})...",
opts.file,
page_count,
page_size
)?;
wprintln!(writer)?;
let pb = create_progress_bar(page_count, "pages");
let results: Vec<(u64, PageResult)> = (0..page_count)
.into_par_iter()
.map(|page_num| {
let offset = page_num as usize * ps;
if offset + ps > all_data.len() {
pb.inc(1);
return (page_num, PageResult::ParseError);
}
let page_data = &all_data[offset..offset + ps];
let result = validate_page(page_data, page_size, &vendor_info);
pb.inc(1);
(page_num, result)
})
.collect();
pb.finish_and_clear();
let mut valid_count = 0u64;
let mut invalid_count = 0u64;
let mut empty_count = 0u64;
let mut lsn_mismatch_count = 0u64;
for (page_num, result) in &results {
match result {
PageResult::ParseError => {
eprintln!("Page {}: Could not parse FIL header", page_num);
invalid_count += 1;
}
PageResult::Empty => {
empty_count += 1;
if opts.verbose {
wprintln!(writer, "Page {}: EMPTY", page_num)?;
}
}
PageResult::Validated {
csum_result,
lsn_valid,
} => {
if csum_result.valid {
valid_count += 1;
if opts.verbose {
wprintln!(
writer,
"Page {}: {} ({:?}, stored={}, calculated={})",
page_num,
"OK".green(),
csum_result.algorithm,
csum_result.stored_checksum,
csum_result.calculated_checksum,
)?;
}
} else {
invalid_count += 1;
wprintln!(
writer,
"Page {}: {} checksum (stored={}, calculated={}, algorithm={:?})",
page_num,
"INVALID".red(),
csum_result.stored_checksum,
csum_result.calculated_checksum,
csum_result.algorithm,
)?;
}
if !lsn_valid {
lsn_mismatch_count += 1;
if csum_result.valid {
wprintln!(
writer,
"Page {}: {} - header LSN low32 does not match trailer",
page_num,
"LSN MISMATCH".yellow(),
)?;
}
}
}
}
}
wprintln!(writer)?;
wprintln!(writer, "Summary:")?;
wprintln!(writer, " Total pages: {}", page_count)?;
wprintln!(writer, " Empty pages: {}", empty_count)?;
wprintln!(writer, " Valid checksums: {}", valid_count)?;
if invalid_count > 0 {
wprintln!(
writer,
" Invalid checksums: {}",
format!("{}", invalid_count).red()
)?;
} else {
wprintln!(
writer,
" Invalid checksums: {}",
format!("{}", invalid_count).green()
)?;
}
if lsn_mismatch_count > 0 {
wprintln!(
writer,
" LSN mismatches: {}",
format!("{}", lsn_mismatch_count).yellow()
)?;
}
if invalid_count > 0 {
return Err(IdbError::Parse(format!(
"{} pages with invalid checksums",
invalid_count
)));
}
Ok(())
}
fn algorithm_name(algo: ChecksumAlgorithm) -> &'static str {
match algo {
ChecksumAlgorithm::Crc32c => "crc32c",
ChecksumAlgorithm::InnoDB => "innodb",
ChecksumAlgorithm::MariaDbFullCrc32 => "mariadb_full_crc32",
ChecksumAlgorithm::None => "none",
}
}
fn execute_streaming(
opts: &ChecksumOptions,
ts: &mut crate::innodb::tablespace::Tablespace,
page_size: u32,
page_count: u64,
vendor_info: &crate::innodb::vendor::VendorInfo,
writer: &mut dyn Write,
) -> Result<(), IdbError> {
let mut valid_count = 0u64;
let mut invalid_count = 0u64;
let mut empty_count = 0u64;
let mut lsn_mismatch_count = 0u64;
if !opts.json {
wprintln!(
writer,
"Validating checksums for {} ({} pages, page size {})...",
opts.file,
page_count,
page_size
)?;
wprintln!(writer)?;
}
ts.for_each_page(|page_num, page_data| {
let result = validate_page(page_data, page_size, vendor_info);
match &result {
PageResult::ParseError => {
invalid_count += 1;
if opts.json {
let obj = PageChecksumJson {
page_number: page_num,
status: "error".to_string(),
algorithm: "unknown".to_string(),
stored_checksum: 0,
calculated_checksum: 0,
lsn_valid: false,
};
let line = serde_json::to_string(&obj)
.map_err(|e| IdbError::Parse(format!("JSON error: {}", e)))?;
wprintln!(writer, "{}", line)?;
} else {
eprintln!("Page {}: Could not parse FIL header", page_num);
}
}
PageResult::Empty => {
empty_count += 1;
if !opts.json && opts.verbose {
wprintln!(writer, "Page {}: EMPTY", page_num)?;
}
}
PageResult::Validated {
csum_result,
lsn_valid,
} => {
if csum_result.valid {
valid_count += 1;
} else {
invalid_count += 1;
}
if !lsn_valid {
lsn_mismatch_count += 1;
}
if opts.json {
if opts.verbose || !csum_result.valid || !lsn_valid {
let obj = PageChecksumJson {
page_number: page_num,
status: if csum_result.valid {
"valid".to_string()
} else {
"invalid".to_string()
},
algorithm: algorithm_name(csum_result.algorithm).to_string(),
stored_checksum: csum_result.stored_checksum,
calculated_checksum: csum_result.calculated_checksum,
lsn_valid: *lsn_valid,
};
let line = serde_json::to_string(&obj)
.map_err(|e| IdbError::Parse(format!("JSON error: {}", e)))?;
wprintln!(writer, "{}", line)?;
}
} else {
if csum_result.valid {
if opts.verbose {
wprintln!(
writer,
"Page {}: {} ({:?}, stored={}, calculated={})",
page_num,
"OK".green(),
csum_result.algorithm,
csum_result.stored_checksum,
csum_result.calculated_checksum,
)?;
}
} else {
wprintln!(
writer,
"Page {}: {} checksum (stored={}, calculated={}, algorithm={:?})",
page_num,
"INVALID".red(),
csum_result.stored_checksum,
csum_result.calculated_checksum,
csum_result.algorithm,
)?;
}
if !lsn_valid && csum_result.valid {
wprintln!(
writer,
"Page {}: {} - header LSN low32 does not match trailer",
page_num,
"LSN MISMATCH".yellow(),
)?;
}
}
}
}
Ok(())
})?;
if !opts.json {
wprintln!(writer)?;
wprintln!(writer, "Summary:")?;
wprintln!(writer, " Total pages: {}", page_count)?;
wprintln!(writer, " Empty pages: {}", empty_count)?;
wprintln!(writer, " Valid checksums: {}", valid_count)?;
if invalid_count > 0 {
wprintln!(
writer,
" Invalid checksums: {}",
format!("{}", invalid_count).red()
)?;
} else {
wprintln!(
writer,
" Invalid checksums: {}",
format!("{}", invalid_count).green()
)?;
}
if lsn_mismatch_count > 0 {
wprintln!(
writer,
" LSN mismatches: {}",
format!("{}", lsn_mismatch_count).yellow()
)?;
}
}
if invalid_count > 0 {
return Err(IdbError::Parse(format!(
"{} pages with invalid checksums",
invalid_count
)));
}
Ok(())
}
fn execute_csv_parallel(
all_data: &[u8],
ps: usize,
page_size: u32,
page_count: u64,
vendor_info: &crate::innodb::vendor::VendorInfo,
writer: &mut dyn Write,
) -> Result<(), IdbError> {
use rayon::prelude::*;
wprintln!(
writer,
"page_number,status,algorithm,stored_checksum,calculated_checksum"
)?;
let results: Vec<(u64, PageResult)> = (0..page_count)
.into_par_iter()
.map(|page_num| {
let offset = page_num as usize * ps;
if offset + ps > all_data.len() {
return (page_num, PageResult::ParseError);
}
let page_data = &all_data[offset..offset + ps];
(page_num, validate_page(page_data, page_size, vendor_info))
})
.collect();
for (page_num, result) in results {
match result {
PageResult::Empty | PageResult::ParseError => {}
PageResult::Validated {
csum_result,
lsn_valid: _,
} => {
let algo = match csum_result.algorithm {
ChecksumAlgorithm::Crc32c => "crc32c",
ChecksumAlgorithm::InnoDB => "innodb",
ChecksumAlgorithm::MariaDbFullCrc32 => "mariadb_full_crc32",
ChecksumAlgorithm::None => "none",
};
let status = if csum_result.valid {
"valid"
} else {
"invalid"
};
wprintln!(
writer,
"{},{},{},{},{}",
page_num,
status,
algo,
csum_result.stored_checksum,
csum_result.calculated_checksum
)?;
}
}
}
Ok(())
}
fn execute_json_parallel(
opts: &ChecksumOptions,
all_data: &[u8],
ps: usize,
page_size: u32,
page_count: u64,
vendor_info: &crate::innodb::vendor::VendorInfo,
writer: &mut dyn Write,
) -> Result<(), IdbError> {
let results: Vec<(u64, PageResult)> = (0..page_count)
.into_par_iter()
.map(|page_num| {
let offset = page_num as usize * ps;
if offset + ps > all_data.len() {
return (page_num, PageResult::ParseError);
}
let page_data = &all_data[offset..offset + ps];
(page_num, validate_page(page_data, page_size, vendor_info))
})
.collect();
let mut valid_count = 0u64;
let mut invalid_count = 0u64;
let mut empty_count = 0u64;
let mut lsn_mismatch_count = 0u64;
let mut pages = Vec::new();
for (page_num, result) in &results {
match result {
PageResult::ParseError => {
invalid_count += 1;
if opts.verbose {
pages.push(PageChecksumJson {
page_number: *page_num,
status: "error".to_string(),
algorithm: "unknown".to_string(),
stored_checksum: 0,
calculated_checksum: 0,
lsn_valid: false,
});
}
}
PageResult::Empty => {
empty_count += 1;
}
PageResult::Validated {
csum_result,
lsn_valid,
} => {
if csum_result.valid {
valid_count += 1;
} else {
invalid_count += 1;
}
if !lsn_valid {
lsn_mismatch_count += 1;
}
if opts.verbose || !csum_result.valid || !lsn_valid {
pages.push(PageChecksumJson {
page_number: *page_num,
status: if csum_result.valid {
"valid".to_string()
} else {
"invalid".to_string()
},
algorithm: algorithm_name(csum_result.algorithm).to_string(),
stored_checksum: csum_result.stored_checksum,
calculated_checksum: csum_result.calculated_checksum,
lsn_valid: *lsn_valid,
});
}
}
}
}
let summary = ChecksumSummaryJson {
file: opts.file.clone(),
page_size,
total_pages: page_count,
empty_pages: empty_count,
valid_pages: valid_count,
invalid_pages: invalid_count,
lsn_mismatches: lsn_mismatch_count,
pages,
};
let json = serde_json::to_string_pretty(&summary)
.map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
wprintln!(writer, "{}", json)?;
if invalid_count > 0 {
return Err(IdbError::Parse(format!(
"{} pages with invalid checksums",
invalid_count
)));
}
Ok(())
}