Skip to main content

idb/innodb/
simulate.rs

1//! Crash recovery simulation.
2//!
3//! Models InnoDB's `innodb_force_recovery` levels 1-6 to predict data
4//! recoverability at each level without actually attempting recovery. Classifies
5//! every page by the minimum recovery level needed to access it, then aggregates
6//! per-index and per-table impact estimates.
7//!
8//! # Recovery levels
9//!
10//! | Level | MySQL constant | Effect |
11//! |-------|---------------|--------|
12//! | 0 | (normal) | Full crash recovery |
13//! | 1 | SRV_FORCE_IGNORE_CORRUPT | Skip corrupt pages |
14//! | 2 | SRV_FORCE_NO_BACKGROUND | Prevent background threads |
15//! | 3 | SRV_FORCE_NO_TRX_UNDO | Skip transaction rollbacks |
16//! | 4 | SRV_FORCE_NO_IBUF_MERGE | Skip insert buffer merge |
17//! | 5 | SRV_FORCE_NO_UNDO_LOG_SCAN | Skip undo log scan |
18//! | 6 | SRV_FORCE_NO_LOG_REDO | Skip redo log application |
19
20use std::collections::{BTreeMap, HashMap};
21
22use serde::Serialize;
23
24use crate::innodb::checksum::{validate_checksum, validate_lsn};
25use crate::innodb::corruption::{classify_corruption, CorruptionPattern};
26use crate::innodb::index::IndexHeader;
27use crate::innodb::page::FilHeader;
28use crate::innodb::page_types::PageType;
29use crate::innodb::sdi;
30use crate::innodb::tablespace::Tablespace;
31use crate::IdbError;
32
33// ---------------------------------------------------------------------------
34// Recovery level constants
35// ---------------------------------------------------------------------------
36
37/// Recovery level descriptions matching MySQL `innodb_force_recovery` values.
38const LEVEL_INFO: [(u8, &str, &str); 7] = [
39    (
40        0,
41        "Normal recovery",
42        "Full crash recovery; all checks applied",
43    ),
44    (
45        1,
46        "SRV_FORCE_IGNORE_CORRUPT",
47        "Skip corrupt pages during recovery",
48    ),
49    (
50        2,
51        "SRV_FORCE_NO_BACKGROUND",
52        "Prevent background threads (purge, insert buffer merge thread)",
53    ),
54    (
55        3,
56        "SRV_FORCE_NO_TRX_UNDO",
57        "Skip transaction rollbacks after recovery",
58    ),
59    (
60        4,
61        "SRV_FORCE_NO_IBUF_MERGE",
62        "Skip insert buffer merge operations",
63    ),
64    (
65        5,
66        "SRV_FORCE_NO_UNDO_LOG_SCAN",
67        "Skip undo log scan; treats incomplete transactions as committed",
68    ),
69    (
70        6,
71        "SRV_FORCE_NO_LOG_REDO",
72        "Skip redo log application; tablespace opened as-is",
73    ),
74];
75
76// ---------------------------------------------------------------------------
77// Data structures
78// ---------------------------------------------------------------------------
79
80/// Per-page recovery classification.
81#[derive(Debug, Clone, Serialize)]
82pub struct PageRecoveryStatus {
83    /// Page number within the tablespace.
84    pub page_number: u64,
85    /// Page type string (e.g., "INDEX", "UNDO_LOG", "FSP_HDR").
86    pub page_type: String,
87    /// InnoDB index ID (only for INDEX pages).
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub index_id: Option<u64>,
90    /// Whether the stored checksum is valid.
91    pub checksum_valid: bool,
92    /// Whether header and trailer LSN values are consistent.
93    pub lsn_consistent: bool,
94    /// Corruption pattern classification (only for corrupt pages).
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub corruption_pattern: Option<String>,
97    /// Number of user records on the page (INDEX pages only).
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub record_count: Option<u16>,
100    /// B+Tree level (0 = leaf; INDEX pages only).
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub btree_level: Option<u16>,
103    /// Minimum `innodb_force_recovery` level required to access this page.
104    pub min_recovery_level: u8,
105}
106
107/// Per-index impact summary.
108#[derive(Debug, Clone, Serialize)]
109pub struct IndexImpact {
110    /// InnoDB internal index ID.
111    pub index_id: u64,
112    /// Index name from SDI metadata (if available).
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub index_name: Option<String>,
115    /// Whether this is the clustered (primary) index.
116    pub is_clustered: bool,
117    /// Total pages belonging to this index.
118    pub total_pages: u64,
119    /// Pages with valid checksums.
120    pub intact_pages: u64,
121    /// Pages with invalid checksums.
122    pub corrupt_pages: u64,
123    /// All-zero pages belonging to this index.
124    pub empty_pages: u64,
125    /// Total user records across intact leaf pages.
126    pub total_records: u64,
127    /// Records at risk (on corrupt leaf pages) at each recovery level.
128    pub lost_records_by_level: BTreeMap<u8, u64>,
129}
130
131/// Per-table impact summary.
132#[derive(Debug, Clone, Serialize)]
133pub struct TableImpact {
134    /// Table name from SDI metadata (if available).
135    #[serde(skip_serializing_if = "Option::is_none")]
136    pub table_name: Option<String>,
137    /// Index-level impacts for this table.
138    pub indexes: Vec<IndexImpact>,
139    /// Data loss estimates per recovery level.
140    pub data_loss_by_level: BTreeMap<u8, DataLossEstimate>,
141}
142
143/// Data loss estimate at a specific recovery level.
144#[derive(Debug, Clone, Serialize)]
145pub struct DataLossEstimate {
146    /// Recovery level (0-6).
147    pub level: u8,
148    /// Whether the table is accessible at this level.
149    pub accessible: bool,
150    /// Number of corrupt pages that would be skipped.
151    pub corrupt_pages_skipped: u64,
152    /// Estimated number of records on corrupt leaf pages.
153    pub records_at_risk: u64,
154    /// Percentage of total records at risk (0.0-100.0).
155    pub pct_data_at_risk: f64,
156}
157
158/// Assessment of a single recovery level across the entire tablespace.
159#[derive(Debug, Clone, Serialize)]
160pub struct LevelAssessment {
161    /// Recovery level (0-6).
162    pub level: u8,
163    /// MySQL constant name (e.g., "SRV_FORCE_IGNORE_CORRUPT").
164    pub name: &'static str,
165    /// Human-readable description.
166    pub description: &'static str,
167    /// Number of tables accessible at this level.
168    pub tables_accessible: u64,
169    /// Total number of tables.
170    pub total_tables: u64,
171    /// Number of tables that would lose some data.
172    pub tables_with_data_loss: u64,
173    /// Total records at risk across all tables.
174    pub total_records_at_risk: u64,
175    /// Overall percentage of data at risk.
176    pub pct_overall_risk: f64,
177    /// Level-specific warnings.
178    #[serde(skip_serializing_if = "Vec::is_empty")]
179    pub warnings: Vec<String>,
180}
181
182/// Recovery plan with recommended level.
183#[derive(Debug, Clone, Serialize)]
184pub struct RecoveryPlan {
185    /// Recommended `innodb_force_recovery` level (0-6).
186    pub recommended_level: u8,
187    /// Human-readable rationale for the recommendation.
188    pub rationale: String,
189    /// Per-level assessment.
190    pub levels: Vec<LevelAssessment>,
191}
192
193/// Page count summary.
194#[derive(Debug, Clone, Serialize)]
195pub struct PageSummary {
196    pub intact: u64,
197    pub corrupt: u64,
198    pub empty: u64,
199    pub unreadable: u64,
200}
201
202/// Complete simulation report.
203#[derive(Debug, Clone, Serialize)]
204pub struct SimulationReport {
205    /// Path to the analyzed file.
206    pub file: String,
207    /// Page size in bytes.
208    pub page_size: u32,
209    /// Total number of pages.
210    pub total_pages: u64,
211    /// Database vendor (MySQL, Percona, MariaDB).
212    pub vendor: String,
213    /// Page count summary.
214    pub page_summary: PageSummary,
215    /// Per-page details (only populated in verbose mode).
216    #[serde(skip_serializing_if = "Vec::is_empty")]
217    pub pages: Vec<PageRecoveryStatus>,
218    /// Per-table impact analysis.
219    pub tables: Vec<TableImpact>,
220    /// Recovery plan with recommendation.
221    pub plan: RecoveryPlan,
222}
223
224// ---------------------------------------------------------------------------
225// Page-level classification
226// ---------------------------------------------------------------------------
227
228/// Classify the minimum recovery level needed to access a page.
229///
230/// Returns the `innodb_force_recovery` level (0-6) required for MySQL to
231/// skip over or tolerate any corruption on this page.
232fn classify_page_recovery_level(page_type: &PageType, checksum_valid: bool, is_empty: bool) -> u8 {
233    // Valid or empty pages are accessible at level 0
234    if checksum_valid || is_empty {
235        return 0;
236    }
237
238    // Corrupt page — level depends on page type
239    match page_type {
240        // FSP_HDR (page 0) and INODE contain tablespace/segment metadata;
241        // corruption here is catastrophic
242        PageType::FspHdr | PageType::Inode => 6,
243
244        // Extent descriptors — needed for space management
245        PageType::Xdes => 5,
246
247        // Insert buffer bitmap — needed for ibuf merge
248        PageType::IbufBitmap => 4,
249
250        // Undo log pages — needed for transaction rollback
251        PageType::UndoLog => 3,
252
253        // Data pages, SDI, LOB — skippable at level 1
254        PageType::Index
255        | PageType::Sdi
256        | PageType::Blob
257        | PageType::ZBlob
258        | PageType::ZBlob2
259        | PageType::LobFirst
260        | PageType::LobData
261        | PageType::LobIndex
262        | PageType::ZlobFirst
263        | PageType::ZlobData
264        | PageType::ZlobIndex
265        | PageType::ZlobFrag
266        | PageType::ZlobFragEntry
267        | PageType::SdiBlob
268        | PageType::SdiZblob
269        | PageType::Rtree => 1,
270
271        // Anything else (system pages, unknown types) — assume level 1
272        _ => 1,
273    }
274}
275
276// ---------------------------------------------------------------------------
277// Core simulation
278// ---------------------------------------------------------------------------
279
280/// Internal page scan result used before aggregation.
281struct ScannedPage {
282    page_number: u64,
283    page_type_name: String,
284    index_id: Option<u64>,
285    checksum_valid: bool,
286    lsn_consistent: bool,
287    corruption_pattern: Option<CorruptionPattern>,
288    record_count: Option<u16>,
289    btree_level: Option<u16>,
290    is_empty: bool,
291    min_recovery_level: u8,
292}
293
294/// Run crash recovery simulation on a tablespace.
295///
296/// Scans every page, classifies recoverability at each level, aggregates
297/// per-index and per-table impact, and builds a recovery plan with an
298/// optimal level recommendation.
299///
300/// Pass `sdi_json` (from an SDI record with `sdi_type == 1`) to enable
301/// index name and table name resolution. If `None`, indexes are identified
302/// by ID only.
303pub fn simulate_recovery(
304    ts: &mut Tablespace,
305    sdi_json: Option<&str>,
306    file_path: &str,
307    verbose: bool,
308) -> Result<SimulationReport, IdbError> {
309    let page_size = ts.page_size();
310    let total_pages = ts.page_count();
311    let vendor = ts.vendor_info().vendor.to_string();
312    let vendor_info = ts.vendor_info().clone();
313
314    // Phase 1: Page scan
315    let mut scanned: Vec<ScannedPage> = Vec::with_capacity(total_pages as usize);
316
317    ts.for_each_page(|page_num, page_data| {
318        let is_empty = page_data.iter().all(|&b| b == 0);
319
320        if is_empty {
321            scanned.push(ScannedPage {
322                page_number: page_num,
323                page_type_name: "ALLOCATED".to_string(),
324                index_id: None,
325                checksum_valid: true,
326                lsn_consistent: true,
327                corruption_pattern: None,
328                record_count: None,
329                btree_level: None,
330                is_empty: true,
331                min_recovery_level: 0,
332            });
333            return Ok(());
334        }
335
336        let header = FilHeader::parse(page_data);
337        let (page_type, page_type_name) = match &header {
338            Some(h) => (h.page_type, h.page_type.name().to_string()),
339            None => (PageType::Unknown(0), "UNKNOWN".to_string()),
340        };
341
342        let checksum_result = validate_checksum(page_data, page_size, Some(&vendor_info));
343        let checksum_valid = checksum_result.valid;
344        let lsn_consistent = validate_lsn(page_data, page_size);
345
346        let corruption_pattern = if !checksum_valid {
347            Some(classify_corruption(page_data, page_size))
348        } else {
349            None
350        };
351
352        // Parse INDEX header for index_id, n_recs, level
353        let (index_id, record_count, btree_level) = if page_type == PageType::Index {
354            match IndexHeader::parse(page_data) {
355                Some(idx_hdr) => (
356                    Some(idx_hdr.index_id),
357                    Some(idx_hdr.n_recs),
358                    Some(idx_hdr.level),
359                ),
360                None => (None, None, None),
361            }
362        } else {
363            (None, None, None)
364        };
365
366        let min_recovery_level = classify_page_recovery_level(&page_type, checksum_valid, false);
367
368        scanned.push(ScannedPage {
369            page_number: page_num,
370            page_type_name,
371            index_id,
372            checksum_valid,
373            lsn_consistent,
374            corruption_pattern,
375            record_count,
376            btree_level,
377            is_empty: false,
378            min_recovery_level,
379        });
380
381        Ok(())
382    })?;
383
384    // Build verbose page list if requested
385    let pages = if verbose {
386        scanned
387            .iter()
388            .map(|s| PageRecoveryStatus {
389                page_number: s.page_number,
390                page_type: s.page_type_name.clone(),
391                index_id: s.index_id,
392                checksum_valid: s.checksum_valid,
393                lsn_consistent: s.lsn_consistent,
394                corruption_pattern: s.corruption_pattern.as_ref().map(|p| p.name().to_string()),
395                record_count: s.record_count,
396                btree_level: s.btree_level,
397                min_recovery_level: s.min_recovery_level,
398            })
399            .collect()
400    } else {
401        Vec::new()
402    };
403
404    // Page summary
405    let mut intact = 0u64;
406    let mut corrupt = 0u64;
407    let mut empty = 0u64;
408    for s in &scanned {
409        if s.is_empty {
410            empty += 1;
411        } else if s.checksum_valid {
412            intact += 1;
413        } else {
414            corrupt += 1;
415        }
416    }
417    let page_summary = PageSummary {
418        intact,
419        corrupt,
420        empty,
421        unreadable: 0,
422    };
423
424    // Phase 2: Index/table aggregation
425    let index_name_map = sdi_json.map(sdi::build_index_name_map).unwrap_or_default();
426    let index_table_map = sdi_json.map(sdi::build_index_table_map).unwrap_or_default();
427
428    // Group pages by index_id (only INDEX pages)
429    let mut index_pages: HashMap<u64, Vec<&ScannedPage>> = HashMap::new();
430    for s in &scanned {
431        if let Some(idx_id) = s.index_id {
432            index_pages.entry(idx_id).or_default().push(s);
433        }
434    }
435
436    // Determine which index_id is the clustered index per table
437    // (lowest index_id per table name is assumed to be clustered)
438    let mut table_min_index: HashMap<String, u64> = HashMap::new();
439    for (&idx_id, table_name) in &index_table_map {
440        let entry = table_min_index.entry(table_name.clone()).or_insert(idx_id);
441        if idx_id < *entry {
442            *entry = idx_id;
443        }
444    }
445    // For indexes without SDI, group them by lowest index_id
446    let mut unknown_min_index: Option<u64> = None;
447    for &idx_id in index_pages.keys() {
448        if !index_table_map.contains_key(&idx_id) {
449            match unknown_min_index {
450                Some(ref mut min) => {
451                    if idx_id < *min {
452                        *min = idx_id;
453                    }
454                }
455                None => unknown_min_index = Some(idx_id),
456            }
457        }
458    }
459
460    // Build per-index impacts
461    let mut index_impacts: Vec<IndexImpact> = Vec::new();
462    let mut sorted_index_ids: Vec<u64> = index_pages.keys().copied().collect();
463    sorted_index_ids.sort();
464
465    for idx_id in sorted_index_ids {
466        let pages_for_index = &index_pages[&idx_id];
467        let mut total = 0u64;
468        let mut intact_count = 0u64;
469        let mut corrupt_count = 0u64;
470        let mut empty_count = 0u64;
471        let mut total_records = 0u64;
472        let mut corrupt_leaf_records = 0u64;
473
474        for s in pages_for_index {
475            total += 1;
476            if s.is_empty {
477                empty_count += 1;
478            } else if s.checksum_valid {
479                intact_count += 1;
480                // Count records on intact leaf pages (level 0)
481                if s.btree_level == Some(0) {
482                    total_records += s.record_count.unwrap_or(0) as u64;
483                }
484            } else {
485                corrupt_count += 1;
486                // Estimate records on corrupt leaf pages using average from intact pages
487                if s.btree_level == Some(0) || s.btree_level.is_none() {
488                    // Use record_count if header was parseable, else we'll estimate later
489                    corrupt_leaf_records += s.record_count.unwrap_or(0) as u64;
490                }
491            }
492        }
493
494        // If corrupt leaf pages have no record count (header unparseable),
495        // estimate from average of intact leaf pages
496        let intact_leaf_count = pages_for_index
497            .iter()
498            .filter(|s| s.checksum_valid && s.btree_level == Some(0))
499            .count() as u64;
500        let avg_records_per_leaf = if intact_leaf_count > 0 {
501            total_records / intact_leaf_count
502        } else {
503            0
504        };
505        let corrupt_leaf_no_header = pages_for_index
506            .iter()
507            .filter(|s| {
508                !s.checksum_valid
509                    && !s.is_empty
510                    && s.record_count.is_none()
511                    && (s.btree_level == Some(0) || s.btree_level.is_none())
512            })
513            .count() as u64;
514        corrupt_leaf_records += corrupt_leaf_no_header * avg_records_per_leaf;
515
516        // Records at risk by level: at level 0, corrupt pages make the table
517        // inaccessible (communicated via accessible=false in DataLossEstimate);
518        // at level 1+, corrupt INDEX pages are skipped (data lost but DB starts).
519        // The lost count is always the corrupt-leaf record estimate — the
520        // accessible flag distinguishes "table won't open" from "some rows lost".
521        let mut lost_by_level = BTreeMap::new();
522        let records_at_risk = corrupt_leaf_records;
523        for lvl in 0..=6 {
524            lost_by_level.insert(lvl, records_at_risk);
525        }
526
527        let is_clustered = if let Some(table_name) = index_table_map.get(&idx_id) {
528            table_min_index.get(table_name) == Some(&idx_id)
529        } else {
530            unknown_min_index == Some(idx_id)
531        };
532
533        index_impacts.push(IndexImpact {
534            index_id: idx_id,
535            index_name: index_name_map.get(&idx_id).cloned(),
536            is_clustered,
537            total_pages: total,
538            intact_pages: intact_count,
539            corrupt_pages: corrupt_count,
540            empty_pages: empty_count,
541            total_records,
542            lost_records_by_level: lost_by_level,
543        });
544    }
545
546    // Group indexes into tables
547    let mut table_groups: BTreeMap<String, Vec<IndexImpact>> = BTreeMap::new();
548    let mut unknown_indexes: Vec<IndexImpact> = Vec::new();
549
550    for impact in index_impacts {
551        if let Some(table_name) = index_table_map.get(&impact.index_id) {
552            table_groups
553                .entry(table_name.clone())
554                .or_default()
555                .push(impact);
556        } else {
557            unknown_indexes.push(impact);
558        }
559    }
560
561    let mut tables: Vec<TableImpact> = Vec::new();
562
563    for (table_name, indexes) in table_groups {
564        let data_loss = build_table_data_loss(&indexes);
565        tables.push(TableImpact {
566            table_name: Some(table_name),
567            indexes,
568            data_loss_by_level: data_loss,
569        });
570    }
571
572    if !unknown_indexes.is_empty() {
573        let data_loss = build_table_data_loss(&unknown_indexes);
574        tables.push(TableImpact {
575            table_name: None,
576            indexes: unknown_indexes,
577            data_loss_by_level: data_loss,
578        });
579    }
580
581    // Phase 3: Recovery plan
582    let plan = build_recovery_plan(&tables, &scanned);
583
584    Ok(SimulationReport {
585        file: file_path.to_string(),
586        page_size,
587        total_pages,
588        vendor,
589        page_summary,
590        pages,
591        tables,
592        plan,
593    })
594}
595
596/// Build per-level data loss estimates for a table from its index impacts.
597fn build_table_data_loss(indexes: &[IndexImpact]) -> BTreeMap<u8, DataLossEstimate> {
598    let mut estimates = BTreeMap::new();
599
600    // Total records across all indexes' intact leaf pages
601    let total_records: u64 = indexes.iter().map(|i| i.total_records).sum();
602    let total_corrupt: u64 = indexes.iter().map(|i| i.corrupt_pages).sum();
603
604    // Check if any critical (non-INDEX) corruption blocks access
605    let has_critical_corruption = indexes.iter().any(|i| i.corrupt_pages > 0);
606
607    for level in 0..=6u8 {
608        let records_at_risk: u64 = indexes
609            .iter()
610            .map(|i| i.lost_records_by_level.get(&level).copied().unwrap_or(0))
611            .sum();
612
613        let pct = if total_records + records_at_risk > 0 {
614            (records_at_risk as f64 / (total_records + records_at_risk) as f64) * 100.0
615        } else if total_corrupt > 0 {
616            100.0
617        } else {
618            0.0
619        };
620
621        // At level 0, table is inaccessible if any pages are corrupt
622        let accessible = if level == 0 {
623            !has_critical_corruption
624        } else {
625            true
626        };
627
628        let corrupt_skipped = if level == 0 { 0 } else { total_corrupt };
629
630        estimates.insert(
631            level,
632            DataLossEstimate {
633                level,
634                accessible,
635                corrupt_pages_skipped: corrupt_skipped,
636                records_at_risk,
637                pct_data_at_risk: (pct * 100.0).round() / 100.0,
638            },
639        );
640    }
641
642    estimates
643}
644
645/// Build the recovery plan with level assessments and recommendation.
646fn build_recovery_plan(tables: &[TableImpact], scanned: &[ScannedPage]) -> RecoveryPlan {
647    let total_tables = tables.len() as u64;
648
649    // Find the maximum min_recovery_level across all pages
650    let max_level_needed = scanned
651        .iter()
652        .map(|s| s.min_recovery_level)
653        .max()
654        .unwrap_or(0);
655
656    // Total records across all tables
657    let total_records: u64 = tables
658        .iter()
659        .flat_map(|t| &t.indexes)
660        .map(|i| i.total_records)
661        .sum();
662
663    let mut levels = Vec::with_capacity(7);
664
665    for level in 0..=6u8 {
666        let (name, description) = LEVEL_INFO
667            .iter()
668            .find(|(l, _, _)| *l == level)
669            .map(|(_, n, d)| (*n, *d))
670            .unwrap_or(("Unknown", "Unknown level"));
671
672        let tables_accessible = tables
673            .iter()
674            .filter(|t| {
675                t.data_loss_by_level
676                    .get(&level)
677                    .map(|e| e.accessible)
678                    .unwrap_or(false)
679            })
680            .count() as u64;
681
682        let tables_with_loss = tables
683            .iter()
684            .filter(|t| {
685                t.data_loss_by_level
686                    .get(&level)
687                    .map(|e| e.records_at_risk > 0)
688                    .unwrap_or(false)
689            })
690            .count() as u64;
691
692        let records_at_risk: u64 = tables
693            .iter()
694            .filter_map(|t| t.data_loss_by_level.get(&level))
695            .map(|e| e.records_at_risk)
696            .sum();
697
698        let total_including_corrupt = total_records + records_at_risk;
699        let pct_risk = if total_including_corrupt > 0 {
700            (records_at_risk as f64 / total_including_corrupt as f64) * 100.0
701        } else {
702            0.0
703        };
704
705        let mut warnings = Vec::new();
706        if level >= 3 {
707            warnings.push("Uncommitted transactions will not be rolled back".to_string());
708        }
709        if level >= 4 {
710            warnings
711                .push("Insert buffer merge skipped; secondary indexes may be stale".to_string());
712        }
713        if level >= 5 {
714            warnings.push("Undo log scan skipped; transaction state unknown".to_string());
715        }
716        if level >= 6 {
717            warnings.push("Redo log replay skipped; pages may reflect pre-crash state".to_string());
718        }
719
720        levels.push(LevelAssessment {
721            level,
722            name,
723            description,
724            tables_accessible,
725            total_tables,
726            tables_with_data_loss: tables_with_loss,
727            total_records_at_risk: records_at_risk,
728            pct_overall_risk: (pct_risk * 100.0).round() / 100.0,
729            warnings,
730        });
731    }
732
733    // Recommendation: lowest level where all tables are accessible
734    let recommended = if max_level_needed == 0 {
735        0
736    } else {
737        levels
738            .iter()
739            .find(|l| l.tables_accessible == total_tables && l.level >= max_level_needed)
740            .map(|l| l.level)
741            .unwrap_or(6)
742    };
743
744    let rationale = build_rationale(recommended, &levels, scanned);
745
746    RecoveryPlan {
747        recommended_level: recommended,
748        rationale,
749        levels,
750    }
751}
752
753/// Generate a human-readable rationale for the recommended level.
754fn build_rationale(recommended: u8, levels: &[LevelAssessment], scanned: &[ScannedPage]) -> String {
755    let corrupt_count = scanned
756        .iter()
757        .filter(|s| !s.checksum_valid && !s.is_empty)
758        .count();
759
760    if recommended == 0 {
761        return "No corrupt pages detected. Normal recovery (level 0) is sufficient.".to_string();
762    }
763
764    let level_info = &levels[recommended as usize];
765    let mut parts = Vec::new();
766
767    parts.push(format!(
768        "Level {} ({}) recommended.",
769        recommended, level_info.name
770    ));
771
772    if corrupt_count > 0 {
773        parts.push(format!(
774            "{} corrupt page{} detected.",
775            corrupt_count,
776            if corrupt_count == 1 { "" } else { "s" }
777        ));
778    }
779
780    if level_info.tables_accessible == level_info.total_tables {
781        parts.push("All tables accessible at this level.".to_string());
782    } else {
783        parts.push(format!(
784            "{}/{} tables accessible.",
785            level_info.tables_accessible, level_info.total_tables
786        ));
787    }
788
789    if level_info.total_records_at_risk > 0 {
790        parts.push(format!(
791            "~{} records at risk ({:.1}% of data).",
792            level_info.total_records_at_risk, level_info.pct_overall_risk
793        ));
794    }
795
796    parts.push(
797        "Based on static file analysis; redo log replay may recover additional pages.".to_string(),
798    );
799
800    parts.join(" ")
801}
802
803// ---------------------------------------------------------------------------
804// Tests
805// ---------------------------------------------------------------------------
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use crate::innodb::constants::*;
811    use byteorder::{BigEndian, ByteOrder};
812    use std::io::Write;
813    use tempfile::NamedTempFile;
814
815    /// Build a minimal valid page with correct CRC-32C checksum.
816    fn build_page(page_number: u32, page_type: u16, page_size: u32) -> Vec<u8> {
817        let ps = page_size as usize;
818        let mut page = vec![0u8; ps];
819
820        // FIL header
821        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_number);
822        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], 0xFFFFFFFF);
823        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], 0xFFFFFFFF);
824        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000 + page_number as u64);
825        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], page_type);
826        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], 1);
827
828        // FIL trailer — LSN low 32 bits must match header
829        let lsn = 1000 + page_number as u64;
830        BigEndian::write_u32(&mut page[ps - 4..], lsn as u32);
831
832        // CRC-32C checksum
833        stamp_crc32c(&mut page, ps);
834
835        page
836    }
837
838    /// Build an INDEX page with index_id, n_recs, and level.
839    fn build_index_page(
840        page_number: u32,
841        index_id: u64,
842        n_recs: u16,
843        level: u16,
844        page_size: u32,
845    ) -> Vec<u8> {
846        let ps = page_size as usize;
847        let mut page = build_page(page_number, 17855, page_size); // INDEX = 17855
848
849        let base = FIL_PAGE_DATA;
850        BigEndian::write_u16(&mut page[base + PAGE_N_RECS..], n_recs);
851        BigEndian::write_u16(&mut page[base + PAGE_N_HEAP..], 0x8000 | (n_recs + 2)); // compact flag
852        BigEndian::write_u16(&mut page[base + PAGE_HEAP_TOP..], 200); // arbitrary
853        BigEndian::write_u16(&mut page[base + PAGE_LEVEL..], level);
854        BigEndian::write_u64(&mut page[base + PAGE_INDEX_ID..], index_id);
855
856        // Re-stamp checksum after INDEX header changes
857        stamp_crc32c(&mut page, ps);
858
859        page
860    }
861
862    /// Compute and write CRC-32C checksum for a page.
863    fn stamp_crc32c(page: &mut [u8], page_size: usize) {
864        let crc1 = crc32c::crc32c(&page[4..26]);
865        let crc2 = crc32c::crc32c(&page[38..page_size - 8]);
866        let checksum = crc1 ^ crc2;
867        BigEndian::write_u32(&mut page[0..4], checksum);
868        // Trailer checksum
869        BigEndian::write_u32(&mut page[page_size - 8..page_size - 4], checksum);
870    }
871
872    /// Build a valid FSP_HDR page (page 0).
873    fn build_fsp_page(page_size: u32, total_pages: u32) -> Vec<u8> {
874        let mut page = build_page(0, 8, page_size); // FSP_HDR = 8
875        let base = FIL_PAGE_DATA;
876        // space_id
877        BigEndian::write_u32(&mut page[base..], 1);
878        // size (pages)
879        BigEndian::write_u32(&mut page[base + 8..], total_pages);
880        // flags — encode page size (for auto-detection to work, we set the proper flags)
881        // For 16K default, flags can be 0
882        BigEndian::write_u32(&mut page[base + 16..], 0);
883
884        stamp_crc32c(&mut page, page_size as usize);
885        page
886    }
887
888    /// Write a tablespace file from page buffers.
889    fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
890        let mut file = NamedTempFile::new().unwrap();
891        for page in pages {
892            file.write_all(page).unwrap();
893        }
894        file.flush().unwrap();
895        file
896    }
897
898    #[test]
899    fn test_classify_level_valid_page() {
900        assert_eq!(
901            classify_page_recovery_level(&PageType::Index, true, false),
902            0
903        );
904    }
905
906    #[test]
907    fn test_classify_level_empty_page() {
908        assert_eq!(
909            classify_page_recovery_level(&PageType::Allocated, false, true),
910            0
911        );
912    }
913
914    #[test]
915    fn test_classify_level_corrupt_index() {
916        assert_eq!(
917            classify_page_recovery_level(&PageType::Index, false, false),
918            1
919        );
920    }
921
922    #[test]
923    fn test_classify_level_corrupt_undo() {
924        assert_eq!(
925            classify_page_recovery_level(&PageType::UndoLog, false, false),
926            3
927        );
928    }
929
930    #[test]
931    fn test_classify_level_corrupt_ibuf() {
932        assert_eq!(
933            classify_page_recovery_level(&PageType::IbufBitmap, false, false),
934            4
935        );
936    }
937
938    #[test]
939    fn test_classify_level_corrupt_xdes() {
940        assert_eq!(
941            classify_page_recovery_level(&PageType::Xdes, false, false),
942            5
943        );
944    }
945
946    #[test]
947    fn test_classify_level_corrupt_fsp_hdr() {
948        assert_eq!(
949            classify_page_recovery_level(&PageType::FspHdr, false, false),
950            6
951        );
952    }
953
954    #[test]
955    fn test_classify_level_corrupt_inode() {
956        assert_eq!(
957            classify_page_recovery_level(&PageType::Inode, false, false),
958            6
959        );
960    }
961
962    #[test]
963    fn test_all_valid_pages_level_zero() {
964        let page_size = 16384u32;
965        let fsp = build_fsp_page(page_size, 4);
966        let idx1 = build_index_page(1, 100, 50, 0, page_size);
967        let idx2 = build_index_page(2, 100, 45, 0, page_size);
968        let idx3 = build_index_page(3, 101, 30, 0, page_size);
969        let file = write_tablespace(&[fsp, idx1, idx2, idx3]);
970
971        let mut ts = Tablespace::open(file.path().to_str().unwrap()).unwrap();
972        let report =
973            simulate_recovery(&mut ts, None, file.path().to_str().unwrap(), false).unwrap();
974
975        assert_eq!(report.plan.recommended_level, 0);
976        assert_eq!(report.page_summary.corrupt, 0);
977        assert_eq!(report.page_summary.intact, 4);
978    }
979
980    #[test]
981    fn test_corrupt_index_page_needs_level_one() {
982        let page_size = 16384u32;
983        let fsp = build_fsp_page(page_size, 3);
984        let idx1 = build_index_page(1, 100, 50, 0, page_size);
985        let mut idx2 = build_index_page(2, 100, 40, 0, page_size);
986        // Corrupt the checksum
987        idx2[0] ^= 0xFF;
988        let file = write_tablespace(&[fsp, idx1, idx2]);
989
990        let mut ts = Tablespace::open(file.path().to_str().unwrap()).unwrap();
991        let report = simulate_recovery(&mut ts, None, file.path().to_str().unwrap(), true).unwrap();
992
993        assert_eq!(report.plan.recommended_level, 1);
994        assert_eq!(report.page_summary.corrupt, 1);
995        // Verify page 2 needs level 1
996        let page2 = report.pages.iter().find(|p| p.page_number == 2).unwrap();
997        assert_eq!(page2.min_recovery_level, 1);
998    }
999
1000    #[test]
1001    fn test_corrupt_fsp_hdr_needs_level_six() {
1002        let page_size = 16384u32;
1003        let mut fsp = build_fsp_page(page_size, 2);
1004        fsp[0] ^= 0xFF; // corrupt checksum
1005        let idx1 = build_index_page(1, 100, 50, 0, page_size);
1006        let file = write_tablespace(&[fsp, idx1]);
1007
1008        let mut ts =
1009            Tablespace::open_with_page_size(file.path().to_str().unwrap(), page_size).unwrap();
1010        let report = simulate_recovery(&mut ts, None, file.path().to_str().unwrap(), true).unwrap();
1011
1012        assert_eq!(report.plan.recommended_level, 6);
1013        let page0 = report.pages.iter().find(|p| p.page_number == 0).unwrap();
1014        assert_eq!(page0.min_recovery_level, 6);
1015    }
1016
1017    #[test]
1018    fn test_empty_pages_no_impact() {
1019        let page_size = 16384u32;
1020        let fsp = build_fsp_page(page_size, 3);
1021        let empty = vec![0u8; page_size as usize];
1022        let idx = build_index_page(2, 100, 50, 0, page_size);
1023        let file = write_tablespace(&[fsp, empty, idx]);
1024
1025        let mut ts = Tablespace::open(file.path().to_str().unwrap()).unwrap();
1026        let report =
1027            simulate_recovery(&mut ts, None, file.path().to_str().unwrap(), false).unwrap();
1028
1029        assert_eq!(report.plan.recommended_level, 0);
1030        assert_eq!(report.page_summary.empty, 1);
1031        assert_eq!(report.page_summary.intact, 2);
1032    }
1033
1034    #[test]
1035    fn test_multiple_indexes_independent() {
1036        let page_size = 16384u32;
1037        let fsp = build_fsp_page(page_size, 4);
1038        let idx100 = build_index_page(1, 100, 50, 0, page_size);
1039        let idx101 = build_index_page(2, 101, 30, 0, page_size);
1040        let mut idx101_corrupt = build_index_page(3, 101, 25, 0, page_size);
1041        idx101_corrupt[0] ^= 0xFF;
1042        let file = write_tablespace(&[fsp, idx100, idx101, idx101_corrupt]);
1043
1044        let mut ts = Tablespace::open(file.path().to_str().unwrap()).unwrap();
1045        let report =
1046            simulate_recovery(&mut ts, None, file.path().to_str().unwrap(), false).unwrap();
1047
1048        assert_eq!(report.plan.recommended_level, 1);
1049
1050        // Find indexes — without SDI they'll be grouped into one unknown table
1051        let all_indexes: Vec<&IndexImpact> =
1052            report.tables.iter().flat_map(|t| &t.indexes).collect();
1053        let idx100_impact = all_indexes.iter().find(|i| i.index_id == 100).unwrap();
1054        let idx101_impact = all_indexes.iter().find(|i| i.index_id == 101).unwrap();
1055
1056        assert_eq!(idx100_impact.corrupt_pages, 0);
1057        assert_eq!(idx101_impact.corrupt_pages, 1);
1058    }
1059
1060    #[test]
1061    fn test_level_assessment_cumulative() {
1062        let page_size = 16384u32;
1063        let fsp = build_fsp_page(page_size, 3);
1064        let idx = build_index_page(1, 100, 50, 0, page_size);
1065        let mut corrupt_idx = build_index_page(2, 100, 40, 0, page_size);
1066        corrupt_idx[0] ^= 0xFF;
1067        let file = write_tablespace(&[fsp, idx, corrupt_idx]);
1068
1069        let mut ts = Tablespace::open(file.path().to_str().unwrap()).unwrap();
1070        let report =
1071            simulate_recovery(&mut ts, None, file.path().to_str().unwrap(), false).unwrap();
1072
1073        // Level 3 should include level 1's warnings plus its own
1074        let level1 = &report.plan.levels[1];
1075        let level3 = &report.plan.levels[3];
1076        assert!(level3.warnings.len() >= level1.warnings.len());
1077        // Records at risk should be same at all levels (just corrupt INDEX pages)
1078        let level0 = &report.plan.levels[0];
1079        assert_eq!(level0.total_records_at_risk, level1.total_records_at_risk);
1080        assert_eq!(level1.total_records_at_risk, level3.total_records_at_risk);
1081    }
1082
1083    #[test]
1084    fn test_sdi_name_resolution() {
1085        let page_size = 16384u32;
1086        let fsp = build_fsp_page(page_size, 3);
1087        let idx1 = build_index_page(1, 139, 50, 0, page_size);
1088        let idx2 = build_index_page(2, 140, 30, 0, page_size);
1089        let file = write_tablespace(&[fsp, idx1, idx2]);
1090
1091        let sdi_json = r#"{
1092            "dd_object": {
1093                "name": "test_table",
1094                "indexes": [
1095                    {"name": "PRIMARY", "se_private_data": "id=139;root=1;"},
1096                    {"name": "idx_name", "se_private_data": "id=140;root=2;"}
1097                ]
1098            }
1099        }"#;
1100
1101        let mut ts = Tablespace::open(file.path().to_str().unwrap()).unwrap();
1102        let report = simulate_recovery(
1103            &mut ts,
1104            Some(sdi_json),
1105            file.path().to_str().unwrap(),
1106            false,
1107        )
1108        .unwrap();
1109
1110        assert_eq!(report.tables.len(), 1);
1111        assert_eq!(report.tables[0].table_name.as_deref(), Some("test_table"));
1112        let primary = report.tables[0]
1113            .indexes
1114            .iter()
1115            .find(|i| i.index_id == 139)
1116            .unwrap();
1117        assert_eq!(primary.index_name.as_deref(), Some("PRIMARY"));
1118        assert!(primary.is_clustered);
1119    }
1120}