Skip to main content

idb/innodb/
verify.rs

1//! Structural validation for InnoDB tablespace files.
2//!
3//! Performs pure structural checks on tablespace pages without
4//! requiring checksums to be valid — useful for catching
5//! logical corruption and metadata inconsistencies.
6
7use byteorder::{BigEndian, ByteOrder};
8use serde::Serialize;
9
10use crate::innodb::constants::*;
11use crate::innodb::index::IndexHeader;
12use crate::innodb::page::FilHeader;
13use crate::innodb::page_types::PageType;
14#[cfg(not(target_arch = "wasm32"))]
15use crate::IdbError;
16
17/// Kind of structural check performed.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
19pub enum VerifyCheckKind {
20    /// Page number at offset 4 matches expected position.
21    PageNumberSequence,
22    /// All pages share the same space_id as page 0.
23    SpaceIdConsistency,
24    /// LSNs are non-decreasing across pages (within tolerance).
25    LsnMonotonicity,
26    /// INDEX pages: B+Tree level is within reasonable bounds.
27    BTreeLevelConsistency,
28    /// prev/next page pointers within file bounds; first page has prev == FIL_NULL.
29    PageChainBounds,
30    /// Trailer LSN low-32 matches header LSN low-32.
31    TrailerLsnMatch,
32}
33
34impl std::fmt::Display for VerifyCheckKind {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            VerifyCheckKind::PageNumberSequence => write!(f, "page_number_sequence"),
38            VerifyCheckKind::SpaceIdConsistency => write!(f, "space_id_consistency"),
39            VerifyCheckKind::LsnMonotonicity => write!(f, "lsn_monotonicity"),
40            VerifyCheckKind::BTreeLevelConsistency => write!(f, "btree_level_consistency"),
41            VerifyCheckKind::PageChainBounds => write!(f, "page_chain_bounds"),
42            VerifyCheckKind::TrailerLsnMatch => write!(f, "trailer_lsn_match"),
43        }
44    }
45}
46
47/// A single finding from structural verification.
48#[derive(Debug, Clone, Serialize)]
49pub struct VerifyFinding {
50    /// Which structural check produced this finding.
51    pub kind: VerifyCheckKind,
52    /// The page number where the issue was found.
53    pub page_number: u64,
54    /// Human-readable description of the issue.
55    pub message: String,
56    /// Expected value (if applicable).
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub expected: Option<String>,
59    /// Actual value found (if applicable).
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub actual: Option<String>,
62}
63
64/// Summary of a single check kind across the tablespace.
65#[derive(Debug, Clone, Serialize)]
66pub struct CheckSummary {
67    /// Which check this summary is for.
68    pub kind: VerifyCheckKind,
69    /// Number of pages examined for this check.
70    pub pages_checked: u64,
71    /// Number of issues found.
72    pub issues_found: u64,
73    /// Whether the check passed (no issues found).
74    pub passed: bool,
75}
76
77/// Configuration for which checks to run.
78pub struct VerifyConfig {
79    /// Check that page numbers in headers match their file position.
80    pub check_page_numbers: bool,
81    /// Check that all pages share the same space_id as page 0.
82    pub check_space_ids: bool,
83    /// Check that LSNs do not decrease significantly between pages.
84    pub check_lsn_monotonicity: bool,
85    /// Check that B+Tree levels are within reasonable bounds on INDEX pages.
86    pub check_btree_levels: bool,
87    /// Check that prev/next chain pointers are within file bounds.
88    pub check_chain_bounds: bool,
89    /// Check that trailer LSN low-32 matches header LSN low-32.
90    pub check_trailer_lsn: bool,
91}
92
93impl Default for VerifyConfig {
94    fn default() -> Self {
95        Self {
96            check_page_numbers: true,
97            check_space_ids: true,
98            check_lsn_monotonicity: true,
99            check_btree_levels: true,
100            check_chain_bounds: true,
101            check_trailer_lsn: true,
102        }
103    }
104}
105
106/// Full verification report for a tablespace.
107#[derive(Debug, Clone, Serialize)]
108pub struct VerifyReport {
109    /// Path to the tablespace file.
110    pub file: String,
111    /// Total number of pages in the file.
112    pub total_pages: u64,
113    /// Page size in bytes.
114    pub page_size: u32,
115    /// Whether all checks passed.
116    pub passed: bool,
117    /// Individual findings (issues found).
118    #[serde(skip_serializing_if = "Vec::is_empty")]
119    pub findings: Vec<VerifyFinding>,
120    /// Per-check summaries.
121    pub summary: Vec<CheckSummary>,
122}
123
124/// Verify a tablespace by running all structural checks.
125///
126/// Takes all pages as a flat byte slice, the page size, and the space_id
127/// from page 0. Returns a `VerifyReport` with findings and per-check summaries.
128pub fn verify_tablespace(
129    all_pages: &[u8],
130    page_size: u32,
131    space_id: u32,
132    file: &str,
133    config: &VerifyConfig,
134) -> VerifyReport {
135    let ps = page_size as usize;
136    let total_pages = (all_pages.len() / ps) as u64;
137    let mut findings = Vec::new();
138
139    // Collect per-check counters
140    let mut page_num_checked = 0u64;
141    let mut page_num_issues = 0u64;
142    let mut space_id_checked = 0u64;
143    let mut space_id_issues = 0u64;
144    let mut lsn_checked = 0u64;
145    let mut lsn_issues = 0u64;
146    let mut btree_checked = 0u64;
147    let mut btree_issues = 0u64;
148    let mut chain_checked = 0u64;
149    let mut chain_issues = 0u64;
150    let mut trailer_checked = 0u64;
151    let mut trailer_issues = 0u64;
152
153    let mut prev_lsn: u64 = 0;
154
155    for page_idx in 0..total_pages {
156        let offset = page_idx as usize * ps;
157        let page_data = &all_pages[offset..offset + ps];
158
159        // Skip all-zero pages
160        if page_data.iter().all(|&b| b == 0) {
161            continue;
162        }
163
164        let header = match FilHeader::parse(page_data) {
165            Some(h) => h,
166            None => continue,
167        };
168
169        // Check 1: Page number sequence
170        if config.check_page_numbers {
171            page_num_checked += 1;
172            if header.page_number as u64 != page_idx {
173                page_num_issues += 1;
174                findings.push(VerifyFinding {
175                    kind: VerifyCheckKind::PageNumberSequence,
176                    page_number: page_idx,
177                    message: format!(
178                        "Page {} has page_number {} in header",
179                        page_idx, header.page_number
180                    ),
181                    expected: Some(page_idx.to_string()),
182                    actual: Some(header.page_number.to_string()),
183                });
184            }
185        }
186
187        // Check 2: Space ID consistency
188        if config.check_space_ids {
189            space_id_checked += 1;
190            if header.space_id != space_id {
191                space_id_issues += 1;
192                findings.push(VerifyFinding {
193                    kind: VerifyCheckKind::SpaceIdConsistency,
194                    page_number: page_idx,
195                    message: format!(
196                        "Page {} has space_id {} (expected {})",
197                        page_idx, header.space_id, space_id
198                    ),
199                    expected: Some(space_id.to_string()),
200                    actual: Some(header.space_id.to_string()),
201                });
202            }
203        }
204
205        // Check 3: LSN monotonicity
206        if config.check_lsn_monotonicity && page_idx > 0 {
207            lsn_checked += 1;
208            // Allow page 0 to have any LSN; subsequent pages should not decrease dramatically.
209            // Minor LSN non-monotonicity is normal due to page flushing order.
210            // We only flag significant drops (> 50% of previous LSN) as issues.
211            if header.lsn > 0 && prev_lsn > 0 && header.lsn < prev_lsn / 2 {
212                lsn_issues += 1;
213                findings.push(VerifyFinding {
214                    kind: VerifyCheckKind::LsnMonotonicity,
215                    page_number: page_idx,
216                    message: format!(
217                        "Page {} LSN {} is significantly lower than previous {}",
218                        page_idx, header.lsn, prev_lsn
219                    ),
220                    expected: Some(format!(">= {}", prev_lsn / 2)),
221                    actual: Some(header.lsn.to_string()),
222                });
223            }
224        }
225        if header.lsn > 0 {
226            prev_lsn = header.lsn;
227        }
228
229        // Check 4: B+Tree level consistency (INDEX pages only)
230        if config.check_btree_levels && header.page_type == PageType::Index {
231            if let Some(idx_header) = IndexHeader::parse(page_data) {
232                btree_checked += 1;
233                // Level 0 = leaf. Level > 0 = internal.
234                // Max InnoDB B+Tree depth is ~64; flag unreasonable values.
235                if idx_header.level > 64 {
236                    btree_issues += 1;
237                    findings.push(VerifyFinding {
238                        kind: VerifyCheckKind::BTreeLevelConsistency,
239                        page_number: page_idx,
240                        message: format!(
241                            "Page {} has unreasonable B+Tree level {}",
242                            page_idx, idx_header.level
243                        ),
244                        expected: Some("<= 64".to_string()),
245                        actual: Some(idx_header.level.to_string()),
246                    });
247                }
248            }
249        }
250
251        // Check 5: Page chain bounds
252        if config.check_chain_bounds {
253            chain_checked += 1;
254            if header.prev_page != FIL_NULL && header.prev_page as u64 >= total_pages {
255                chain_issues += 1;
256                findings.push(VerifyFinding {
257                    kind: VerifyCheckKind::PageChainBounds,
258                    page_number: page_idx,
259                    message: format!(
260                        "Page {} prev pointer {} is out of bounds (total: {})",
261                        page_idx, header.prev_page, total_pages
262                    ),
263                    expected: Some(format!("< {} or FIL_NULL", total_pages)),
264                    actual: Some(header.prev_page.to_string()),
265                });
266            }
267            if header.next_page != FIL_NULL && header.next_page as u64 >= total_pages {
268                chain_issues += 1;
269                findings.push(VerifyFinding {
270                    kind: VerifyCheckKind::PageChainBounds,
271                    page_number: page_idx,
272                    message: format!(
273                        "Page {} next pointer {} is out of bounds (total: {})",
274                        page_idx, header.next_page, total_pages
275                    ),
276                    expected: Some(format!("< {} or FIL_NULL", total_pages)),
277                    actual: Some(header.next_page.to_string()),
278                });
279            }
280        }
281
282        // Check 6: Trailer LSN match
283        if config.check_trailer_lsn {
284            trailer_checked += 1;
285            let trailer_offset = ps - SIZE_FIL_TRAILER;
286            if page_data.len() >= trailer_offset + 8 {
287                let trailer_lsn_low =
288                    BigEndian::read_u32(&page_data[trailer_offset + 4..trailer_offset + 8]);
289                let header_lsn_low = (header.lsn & 0xFFFFFFFF) as u32;
290                if trailer_lsn_low != header_lsn_low {
291                    trailer_issues += 1;
292                    findings.push(VerifyFinding {
293                        kind: VerifyCheckKind::TrailerLsnMatch,
294                        page_number: page_idx,
295                        message: format!(
296                            "Page {} header LSN low32 0x{:08X} != trailer 0x{:08X}",
297                            page_idx, header_lsn_low, trailer_lsn_low
298                        ),
299                        expected: Some(format!("0x{:08X}", header_lsn_low)),
300                        actual: Some(format!("0x{:08X}", trailer_lsn_low)),
301                    });
302                }
303            }
304        }
305    }
306
307    // Build summaries
308    let mut summary = Vec::new();
309    if config.check_page_numbers {
310        summary.push(CheckSummary {
311            kind: VerifyCheckKind::PageNumberSequence,
312            pages_checked: page_num_checked,
313            issues_found: page_num_issues,
314            passed: page_num_issues == 0,
315        });
316    }
317    if config.check_space_ids {
318        summary.push(CheckSummary {
319            kind: VerifyCheckKind::SpaceIdConsistency,
320            pages_checked: space_id_checked,
321            issues_found: space_id_issues,
322            passed: space_id_issues == 0,
323        });
324    }
325    if config.check_lsn_monotonicity {
326        summary.push(CheckSummary {
327            kind: VerifyCheckKind::LsnMonotonicity,
328            pages_checked: lsn_checked,
329            issues_found: lsn_issues,
330            passed: lsn_issues == 0,
331        });
332    }
333    if config.check_btree_levels {
334        summary.push(CheckSummary {
335            kind: VerifyCheckKind::BTreeLevelConsistency,
336            pages_checked: btree_checked,
337            issues_found: btree_issues,
338            passed: btree_issues == 0,
339        });
340    }
341    if config.check_chain_bounds {
342        summary.push(CheckSummary {
343            kind: VerifyCheckKind::PageChainBounds,
344            pages_checked: chain_checked,
345            issues_found: chain_issues,
346            passed: chain_issues == 0,
347        });
348    }
349    if config.check_trailer_lsn {
350        summary.push(CheckSummary {
351            kind: VerifyCheckKind::TrailerLsnMatch,
352            pages_checked: trailer_checked,
353            issues_found: trailer_issues,
354            passed: trailer_issues == 0,
355        });
356    }
357
358    let passed = summary.iter().all(|s| s.passed);
359
360    VerifyReport {
361        file: file.to_string(),
362        total_pages,
363        page_size,
364        passed,
365        findings,
366        summary,
367    }
368}
369
370// ---------------------------------------------------------------------------
371// Redo log continuity verification (#102)
372// ---------------------------------------------------------------------------
373
374/// Result of verifying redo log continuity against a tablespace.
375#[derive(Debug, Clone, Serialize)]
376pub struct RedoVerifyResult {
377    /// Path to the redo log file.
378    pub redo_file: String,
379    /// Checkpoint LSN from the redo log.
380    pub checkpoint_lsn: u64,
381    /// Maximum LSN found across all tablespace pages.
382    pub tablespace_max_lsn: u64,
383    /// Whether the redo log covers the tablespace (checkpoint >= max page LSN).
384    pub covers_tablespace: bool,
385    /// LSN gap (tablespace_max_lsn - checkpoint_lsn) if not covered; 0 otherwise.
386    pub lsn_gap: u64,
387}
388
389/// Verify redo log continuity against a tablespace.
390///
391/// Opens the redo log, reads the most recent checkpoint LSN (higher of the
392/// two checkpoint slots), and compares it against the maximum LSN found
393/// across all pages in the tablespace.
394#[cfg(not(target_arch = "wasm32"))]
395pub fn verify_redo_continuity(
396    redo_path: &str,
397    all_pages: &[u8],
398    page_size: u32,
399) -> Result<RedoVerifyResult, IdbError> {
400    use crate::innodb::log::LogFile;
401
402    let mut log = LogFile::open(redo_path)?;
403    let cp0 = log.read_checkpoint(0)?;
404    let cp1 = log.read_checkpoint(1)?;
405    let checkpoint_lsn = cp0.lsn.max(cp1.lsn);
406
407    let ps = page_size as usize;
408    let total_pages = all_pages.len() / ps;
409    let mut max_lsn: u64 = 0;
410
411    for i in 0..total_pages {
412        let page_data = &all_pages[i * ps..(i + 1) * ps];
413        if page_data.iter().all(|&b| b == 0) {
414            continue;
415        }
416        if let Some(header) = FilHeader::parse(page_data) {
417            if header.lsn > max_lsn {
418                max_lsn = header.lsn;
419            }
420        }
421    }
422
423    let covers_tablespace = checkpoint_lsn >= max_lsn;
424    let lsn_gap = if covers_tablespace {
425        0
426    } else {
427        max_lsn - checkpoint_lsn
428    };
429
430    Ok(RedoVerifyResult {
431        redo_file: redo_path.to_string(),
432        checkpoint_lsn,
433        tablespace_max_lsn: max_lsn,
434        covers_tablespace,
435        lsn_gap,
436    })
437}
438
439// ---------------------------------------------------------------------------
440// Backup chain verification (#101)
441// ---------------------------------------------------------------------------
442
443/// Information extracted from a tablespace file for chain ordering.
444#[derive(Debug, Clone, Serialize)]
445pub struct ChainFileInfo {
446    /// Path to the file.
447    pub file: String,
448    /// Space ID from page 0.
449    pub space_id: u32,
450    /// Maximum LSN found across all pages.
451    pub max_lsn: u64,
452    /// Minimum non-zero LSN found.
453    pub min_lsn: u64,
454    /// Total pages in the file.
455    pub total_pages: u64,
456}
457
458/// A gap detected between consecutive files in the chain.
459#[derive(Debug, Clone, Serialize)]
460pub struct ChainGap {
461    /// The file before the gap.
462    pub from_file: String,
463    /// Max LSN of the file before the gap.
464    pub from_max_lsn: u64,
465    /// The file after the gap.
466    pub to_file: String,
467    /// Min LSN of the file after the gap.
468    pub to_min_lsn: u64,
469    /// Size of the LSN gap.
470    pub gap_size: u64,
471}
472
473/// Report of backup chain verification.
474#[derive(Debug, Clone, Serialize)]
475pub struct ChainReport {
476    /// Ordered list of files in the chain (by max LSN).
477    pub files: Vec<ChainFileInfo>,
478    /// Gaps detected between consecutive files.
479    pub gaps: Vec<ChainGap>,
480    /// Whether the chain is contiguous (no gaps).
481    pub contiguous: bool,
482    /// Whether all files share the same space_id.
483    pub consistent_space_id: bool,
484}
485
486/// Extract chain info from a tablespace's raw page data.
487pub fn extract_chain_file_info(all_pages: &[u8], page_size: u32, file: &str) -> ChainFileInfo {
488    let ps = page_size as usize;
489    let total_pages = (all_pages.len() / ps) as u64;
490    let mut max_lsn: u64 = 0;
491    let mut min_lsn: u64 = u64::MAX;
492    let mut space_id: u32 = 0;
493
494    for i in 0..total_pages as usize {
495        let page_data = &all_pages[i * ps..(i + 1) * ps];
496        if page_data.iter().all(|&b| b == 0) {
497            continue;
498        }
499        if let Some(header) = FilHeader::parse(page_data) {
500            if i == 0 {
501                space_id = header.space_id;
502            }
503            if header.lsn > max_lsn {
504                max_lsn = header.lsn;
505            }
506            if header.lsn > 0 && header.lsn < min_lsn {
507                min_lsn = header.lsn;
508            }
509        }
510    }
511
512    if min_lsn == u64::MAX {
513        min_lsn = 0;
514    }
515
516    ChainFileInfo {
517        file: file.to_string(),
518        space_id,
519        max_lsn,
520        min_lsn,
521        total_pages,
522    }
523}
524
525/// Verify a backup chain given pre-extracted file info.
526///
527/// Orders files by max_lsn and checks for LSN gaps between consecutive files.
528/// A gap exists when one file's min_lsn is greater than the previous file's max_lsn.
529pub fn verify_backup_chain(mut files_info: Vec<ChainFileInfo>) -> ChainReport {
530    if files_info.is_empty() {
531        return ChainReport {
532            files: vec![],
533            gaps: vec![],
534            contiguous: true,
535            consistent_space_id: true,
536        };
537    }
538
539    // Sort by max_lsn ascending
540    files_info.sort_by_key(|f| f.max_lsn);
541
542    // Check space_id consistency
543    let first_space_id = files_info[0].space_id;
544    let consistent_space_id = files_info.iter().all(|f| f.space_id == first_space_id);
545
546    // Detect gaps
547    let mut gaps = Vec::new();
548    for pair in files_info.windows(2) {
549        let prev = &pair[0];
550        let next = &pair[1];
551        // If next file's min LSN > prev file's max LSN, there's a gap
552        if next.min_lsn > prev.max_lsn {
553            gaps.push(ChainGap {
554                from_file: prev.file.clone(),
555                from_max_lsn: prev.max_lsn,
556                to_file: next.file.clone(),
557                to_min_lsn: next.min_lsn,
558                gap_size: next.min_lsn - prev.max_lsn,
559            });
560        }
561    }
562
563    let contiguous = gaps.is_empty();
564
565    ChainReport {
566        files: files_info,
567        gaps,
568        contiguous,
569        consistent_space_id,
570    }
571}
572
573// ---------------------------------------------------------------------------
574// Backup metadata verification
575// ---------------------------------------------------------------------------
576
577/// A page whose LSN falls outside the backup checkpoint window.
578#[derive(Debug, Clone, Serialize)]
579pub struct BackupMetaPageIssue {
580    pub page_number: u64,
581    pub lsn: u64,
582    pub page_type: String,
583}
584
585/// Result of verifying tablespace page LSNs against XtraBackup checkpoint metadata.
586#[derive(Debug, Clone, Serialize)]
587pub struct BackupMetaVerifyResult {
588    pub checkpoint_file: String,
589    pub backup_type: String,
590    pub from_lsn: u64,
591    pub to_lsn: u64,
592    pub tablespace_min_lsn: u64,
593    pub tablespace_max_lsn: u64,
594    pub pages_before_window: Vec<BackupMetaPageIssue>,
595    pub pages_after_window: Vec<BackupMetaPageIssue>,
596    pub passed: bool,
597}
598
599/// Verify tablespace page LSNs against an XtraBackup checkpoint file.
600///
601/// Pages with LSN > `to_lsn` are reported as after the backup window.
602/// For full backups (`from_lsn == 0`), pages with non-zero LSN < `from_lsn`
603/// are reported as before the window.  For incremental backups (`from_lsn > 0`),
604/// pages below the window are expected (unmodified since the base) and are not
605/// flagged as failures.
606#[cfg(not(target_arch = "wasm32"))]
607pub fn verify_backup_meta(
608    checkpoint_path: &str,
609    all_pages: &[u8],
610    page_size: u32,
611) -> Result<BackupMetaVerifyResult, crate::IdbError> {
612    use crate::innodb::backup::parse_xtrabackup_checkpoints;
613    use std::path::Path;
614
615    let checkpoint = parse_xtrabackup_checkpoints(Path::new(checkpoint_path))?;
616
617    let ps = page_size as usize;
618    let total_pages = all_pages.len() / ps;
619    let is_incremental = checkpoint.from_lsn > 0;
620
621    let mut min_lsn = u64::MAX;
622    let mut max_lsn = 0u64;
623    let mut pages_before = Vec::new();
624    let mut pages_after = Vec::new();
625
626    for i in 0..total_pages {
627        let offset = i * ps;
628        let page_data = &all_pages[offset..offset + ps];
629
630        // Skip all-zero pages
631        if page_data.iter().all(|&b| b == 0) {
632            continue;
633        }
634
635        if let Some(hdr) = FilHeader::parse(page_data) {
636            let lsn = hdr.lsn;
637            if lsn == 0 {
638                continue;
639            }
640
641            if lsn < min_lsn {
642                min_lsn = lsn;
643            }
644            if lsn > max_lsn {
645                max_lsn = lsn;
646            }
647
648            if lsn > checkpoint.to_lsn {
649                pages_after.push(BackupMetaPageIssue {
650                    page_number: i as u64,
651                    lsn,
652                    page_type: hdr.page_type.name().to_string(),
653                });
654            } else if !is_incremental && lsn < checkpoint.from_lsn {
655                // For full backups, pages before the window are unexpected.
656                // For incremental backups, unmodified pages legitimately
657                // have LSNs below from_lsn and are not flagged.
658                pages_before.push(BackupMetaPageIssue {
659                    page_number: i as u64,
660                    lsn,
661                    page_type: hdr.page_type.name().to_string(),
662                });
663            }
664        }
665    }
666
667    // If no non-zero LSN pages were found, set min/max to 0
668    if min_lsn == u64::MAX {
669        min_lsn = 0;
670    }
671
672    let passed = pages_before.is_empty() && pages_after.is_empty();
673
674    Ok(BackupMetaVerifyResult {
675        checkpoint_file: checkpoint_path.to_string(),
676        backup_type: checkpoint.backup_type,
677        from_lsn: checkpoint.from_lsn,
678        to_lsn: checkpoint.to_lsn,
679        tablespace_min_lsn: min_lsn,
680        tablespace_max_lsn: max_lsn,
681        pages_before_window: pages_before,
682        pages_after_window: pages_after,
683        passed,
684    })
685}