1use serde::Serialize;
11use std::path::Path;
12use std::path::PathBuf;
13
14use crate::innodb::constants::*;
15use crate::innodb::page::FilHeader;
16use crate::innodb::page_types::PageType;
17
18#[derive(Debug, Clone, Serialize)]
20pub struct TablespaceMapping {
21 pub name: String,
23 pub space_id: u32,
25 #[serde(skip_serializing_if = "Option::is_none")]
27 pub row_format: Option<String>,
28}
29
30#[derive(Debug, Clone, Serialize)]
32pub struct MysqlIndexInfo {
33 pub name: String,
35 pub table_id: u64,
37 pub space_id: u32,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub page_no: Option<u64>,
42}
43
44#[derive(Debug, Clone, Serialize)]
46pub struct MysqlTableStats {
47 pub num_rows: u64,
49 #[serde(skip_serializing_if = "Option::is_none")]
51 pub auto_increment: Option<u64>,
52}
53
54pub trait MysqlSource {
56 fn tablespace_mappings(&self) -> Result<Vec<TablespaceMapping>, crate::IdbError>;
58}
59
60#[derive(Debug, Clone, Serialize)]
62pub struct OrphanFile {
63 pub path: String,
65 pub space_id: u32,
67}
68
69#[derive(Debug, Clone, Serialize)]
71pub struct MissingFile {
72 pub name: String,
74 pub space_id: u32,
76}
77
78#[derive(Debug, Clone, Serialize)]
80pub struct SpaceIdMismatch {
81 pub path: String,
83 pub disk_space_id: u32,
85 pub mysql_space_id: u32,
87 pub mysql_name: String,
89}
90
91#[derive(Debug, Clone, Serialize)]
93pub struct ValidationReport {
94 pub disk_files: usize,
96 pub mysql_tablespaces: usize,
98 pub orphans: Vec<OrphanFile>,
100 pub missing: Vec<MissingFile>,
102 pub mismatches: Vec<SpaceIdMismatch>,
104 pub passed: bool,
106}
107
108pub fn cross_validate(disk: &[(PathBuf, u32)], mysql: &[TablespaceMapping]) -> ValidationReport {
113 use std::collections::HashMap;
114
115 let mysql_by_space: HashMap<u32, &TablespaceMapping> =
117 mysql.iter().map(|m| (m.space_id, m)).collect();
118
119 let mut orphans = Vec::new();
120 let mut mismatches = Vec::new();
121 let mut matched_space_ids: std::collections::HashSet<u32> = std::collections::HashSet::new();
122
123 for (path, disk_space_id) in disk {
124 let path_str = path.to_string_lossy().to_string();
125
126 if mysql_by_space.contains_key(disk_space_id) {
128 matched_space_ids.insert(*disk_space_id);
129 } else {
130 let mut found_by_name = false;
132 for m in mysql {
133 let expected_suffix =
135 format!("{}.ibd", m.name.replace('/', std::path::MAIN_SEPARATOR_STR));
136 if path_str.ends_with(&expected_suffix) {
137 mismatches.push(SpaceIdMismatch {
139 path: path_str.clone(),
140 disk_space_id: *disk_space_id,
141 mysql_space_id: m.space_id,
142 mysql_name: m.name.clone(),
143 });
144 matched_space_ids.insert(m.space_id);
145 found_by_name = true;
146 break;
147 }
148 }
149 if !found_by_name {
150 orphans.push(OrphanFile {
151 path: path_str,
152 space_id: *disk_space_id,
153 });
154 }
155 }
156 }
157
158 let missing: Vec<MissingFile> = mysql
160 .iter()
161 .filter(|m| !matched_space_ids.contains(&m.space_id))
162 .map(|m| MissingFile {
163 name: m.name.clone(),
164 space_id: m.space_id,
165 })
166 .collect();
167
168 let passed = orphans.is_empty() && missing.is_empty() && mismatches.is_empty();
169
170 ValidationReport {
171 disk_files: disk.len(),
172 mysql_tablespaces: mysql.len(),
173 orphans,
174 missing,
175 mismatches,
176 passed,
177 }
178}
179
180pub fn detect_orphans(
182 disk: &[(PathBuf, u32)],
183 mysql: &[TablespaceMapping],
184) -> (Vec<OrphanFile>, Vec<MissingFile>) {
185 let report = cross_validate(disk, mysql);
186 (report.orphans, report.missing)
187}
188
189#[derive(Debug, Clone, Serialize)]
191pub struct TableValidationReport {
192 pub table_name: String,
194 pub mysql_space_id: u32,
196 #[serde(skip_serializing_if = "Option::is_none")]
198 pub disk_space_id: Option<u32>,
199 #[serde(skip_serializing_if = "Option::is_none")]
201 pub file_path: Option<String>,
202 pub space_id_match: bool,
204 #[serde(skip_serializing_if = "Option::is_none")]
206 pub mysql_row_format: Option<String>,
207 pub indexes_verified: usize,
209 pub indexes: Vec<IndexValidation>,
211 pub passed: bool,
213}
214
215#[derive(Debug, Clone, Serialize)]
217pub struct IndexValidation {
218 pub name: String,
220 #[serde(skip_serializing_if = "Option::is_none")]
222 pub root_page: Option<u64>,
223 pub root_page_valid: bool,
225 #[serde(skip_serializing_if = "Option::is_none")]
227 pub message: Option<String>,
228}
229
230pub fn deep_validate_table(
237 datadir: &Path,
238 table_name: &str,
239 mapping: &TablespaceMapping,
240 indexes: &[MysqlIndexInfo],
241 page_size: Option<u32>,
242 _use_mmap: bool,
243) -> TableValidationReport {
244 let ps = page_size.unwrap_or(SIZE_PAGE_DEFAULT) as usize;
245 let ibd_path = datadir.join(format!("{}.ibd", table_name));
246
247 if !ibd_path.exists() {
248 return TableValidationReport {
249 table_name: table_name.to_string(),
250 mysql_space_id: mapping.space_id,
251 disk_space_id: None,
252 file_path: None,
253 space_id_match: false,
254 mysql_row_format: mapping.row_format.clone(),
255 indexes_verified: 0,
256 indexes: indexes
257 .iter()
258 .map(|idx| IndexValidation {
259 name: idx.name.clone(),
260 root_page: idx.page_no,
261 root_page_valid: false,
262 message: Some("File not found".to_string()),
263 })
264 .collect(),
265 passed: false,
266 };
267 }
268
269 let file_path_str = ibd_path.display().to_string();
270
271 let file_data = match std::fs::read(&ibd_path) {
272 Ok(data) => data,
273 Err(e) => {
274 return TableValidationReport {
275 table_name: table_name.to_string(),
276 mysql_space_id: mapping.space_id,
277 disk_space_id: None,
278 file_path: Some(file_path_str),
279 space_id_match: false,
280 mysql_row_format: mapping.row_format.clone(),
281 indexes_verified: 0,
282 indexes: indexes
283 .iter()
284 .map(|idx| IndexValidation {
285 name: idx.name.clone(),
286 root_page: idx.page_no,
287 root_page_valid: false,
288 message: Some(format!("Cannot read file: {}", e)),
289 })
290 .collect(),
291 passed: false,
292 };
293 }
294 };
295
296 if file_data.len() < ps {
297 return TableValidationReport {
298 table_name: table_name.to_string(),
299 mysql_space_id: mapping.space_id,
300 disk_space_id: None,
301 file_path: Some(file_path_str),
302 space_id_match: false,
303 mysql_row_format: mapping.row_format.clone(),
304 indexes_verified: 0,
305 indexes: indexes
306 .iter()
307 .map(|idx| IndexValidation {
308 name: idx.name.clone(),
309 root_page: idx.page_no,
310 root_page_valid: false,
311 message: Some("File too small to contain page 0".to_string()),
312 })
313 .collect(),
314 passed: false,
315 };
316 }
317
318 let page0 = &file_data[..ps];
320 let disk_space_id = FilHeader::parse(page0).map(|h| h.space_id);
321 let space_id_match = disk_space_id == Some(mapping.space_id);
322
323 let total_pages = file_data.len() / ps;
325 let mut index_validations = Vec::with_capacity(indexes.len());
326 let mut all_indexes_valid = true;
327
328 for idx in indexes {
329 let page_no = match idx.page_no {
330 Some(pn) => pn,
331 None => {
332 index_validations.push(IndexValidation {
333 name: idx.name.clone(),
334 root_page: None,
335 root_page_valid: false,
336 message: Some("No root page number from MySQL".to_string()),
337 });
338 all_indexes_valid = false;
339 continue;
340 }
341 };
342
343 if page_no as usize >= total_pages {
344 index_validations.push(IndexValidation {
345 name: idx.name.clone(),
346 root_page: Some(page_no),
347 root_page_valid: false,
348 message: Some(format!(
349 "Root page {} beyond file extent ({} pages)",
350 page_no, total_pages
351 )),
352 });
353 all_indexes_valid = false;
354 continue;
355 }
356
357 let page_offset = page_no as usize * ps;
358 let page_data = &file_data[page_offset..page_offset + ps];
359
360 match FilHeader::parse(page_data) {
361 Some(hdr) => {
362 let is_index = hdr.page_type == PageType::Index;
363 if is_index {
364 index_validations.push(IndexValidation {
365 name: idx.name.clone(),
366 root_page: Some(page_no),
367 root_page_valid: true,
368 message: None,
369 });
370 } else {
371 index_validations.push(IndexValidation {
372 name: idx.name.clone(),
373 root_page: Some(page_no),
374 root_page_valid: false,
375 message: Some(format!(
376 "Root page {} has type {} (expected INDEX)",
377 page_no,
378 hdr.page_type.name()
379 )),
380 });
381 all_indexes_valid = false;
382 }
383 }
384 None => {
385 index_validations.push(IndexValidation {
386 name: idx.name.clone(),
387 root_page: Some(page_no),
388 root_page_valid: false,
389 message: Some(format!("Cannot parse FIL header on page {}", page_no)),
390 });
391 all_indexes_valid = false;
392 }
393 }
394 }
395
396 let indexes_verified = index_validations
397 .iter()
398 .filter(|v| v.root_page_valid)
399 .count();
400 let passed = space_id_match && all_indexes_valid;
401
402 TableValidationReport {
403 table_name: table_name.to_string(),
404 mysql_space_id: mapping.space_id,
405 disk_space_id,
406 file_path: Some(file_path_str),
407 space_id_match,
408 mysql_row_format: mapping.row_format.clone(),
409 indexes_verified,
410 indexes: index_validations,
411 passed,
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn test_cross_validate_all_match() {
421 let disk = vec![
422 (PathBuf::from("/data/mydb/t1.ibd"), 10),
423 (PathBuf::from("/data/mydb/t2.ibd"), 20),
424 ];
425 let mysql = vec![
426 TablespaceMapping {
427 name: "mydb/t1".into(),
428 space_id: 10,
429 row_format: None,
430 },
431 TablespaceMapping {
432 name: "mydb/t2".into(),
433 space_id: 20,
434 row_format: None,
435 },
436 ];
437
438 let report = cross_validate(&disk, &mysql);
439 assert!(report.passed);
440 assert!(report.orphans.is_empty());
441 assert!(report.missing.is_empty());
442 assert!(report.mismatches.is_empty());
443 }
444
445 #[test]
446 fn test_cross_validate_orphan_detected() {
447 let disk = vec![
448 (PathBuf::from("/data/mydb/t1.ibd"), 10),
449 (PathBuf::from("/data/mydb/old.ibd"), 99),
450 ];
451 let mysql = vec![TablespaceMapping {
452 name: "mydb/t1".into(),
453 space_id: 10,
454 row_format: None,
455 }];
456
457 let report = cross_validate(&disk, &mysql);
458 assert!(!report.passed);
459 assert_eq!(report.orphans.len(), 1);
460 assert_eq!(report.orphans[0].space_id, 99);
461 }
462
463 #[test]
464 fn test_cross_validate_missing_detected() {
465 let disk = vec![(PathBuf::from("/data/mydb/t1.ibd"), 10)];
466 let mysql = vec![
467 TablespaceMapping {
468 name: "mydb/t1".into(),
469 space_id: 10,
470 row_format: None,
471 },
472 TablespaceMapping {
473 name: "mydb/t2".into(),
474 space_id: 20,
475 row_format: None,
476 },
477 ];
478
479 let report = cross_validate(&disk, &mysql);
480 assert!(!report.passed);
481 assert_eq!(report.missing.len(), 1);
482 assert_eq!(report.missing[0].space_id, 20);
483 }
484
485 #[test]
486 fn test_cross_validate_empty() {
487 let report = cross_validate(&[], &[]);
488 assert!(report.passed);
489 }
490
491 #[test]
492 fn test_detect_orphans_convenience() {
493 let disk = vec![
494 (PathBuf::from("/data/mydb/t1.ibd"), 10),
495 (PathBuf::from("/data/mydb/orphan.ibd"), 99),
496 ];
497 let mysql = vec![
498 TablespaceMapping {
499 name: "mydb/t1".into(),
500 space_id: 10,
501 row_format: None,
502 },
503 TablespaceMapping {
504 name: "mydb/missing".into(),
505 space_id: 50,
506 row_format: None,
507 },
508 ];
509
510 let (orphans, missing) = detect_orphans(&disk, &mysql);
511 assert_eq!(orphans.len(), 1);
512 assert_eq!(missing.len(), 1);
513 }
514
515 use byteorder::{BigEndian, ByteOrder};
518 use std::io::Write;
519 use tempfile::TempDir;
520
521 const PAGE_SIZE: u32 = 16384;
522 const PS: usize = PAGE_SIZE as usize;
523
524 fn write_crc32c_checksum(page: &mut [u8]) {
525 let ps = page.len();
526 let crc1 = crc32c::crc32c(&page[4..26]);
527 let crc2 = crc32c::crc32c(&page[38..ps - 8]);
528 let checksum = crc1 ^ crc2;
529 BigEndian::write_u32(&mut page[0..4], checksum);
530 }
531
532 fn build_page0(space_id: u32, total_pages: u32) -> Vec<u8> {
533 let mut page = vec![0u8; PS];
534 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
535 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
536 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
537 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
538 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
540 BigEndian::write_u32(&mut page[FIL_PAGE_DATA + FSP_SPACE_ID..], space_id);
541 BigEndian::write_u32(&mut page[FIL_PAGE_DATA + FSP_SIZE..], total_pages);
542 let trailer = PS - SIZE_FIL_TRAILER;
543 BigEndian::write_u32(&mut page[trailer + 4..], 1000u32);
544 write_crc32c_checksum(&mut page);
545 page
546 }
547
548 fn build_index_page(page_num: u32, space_id: u32) -> Vec<u8> {
549 let mut page = vec![0u8; PS];
550 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
551 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
552 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
553 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 2000);
554 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
556 let trailer = PS - SIZE_FIL_TRAILER;
557 BigEndian::write_u32(&mut page[trailer + 4..], 2000u32);
558 write_crc32c_checksum(&mut page);
559 page
560 }
561
562 fn build_undo_page(page_num: u32, space_id: u32) -> Vec<u8> {
563 let mut page = vec![0u8; PS];
564 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
565 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
566 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
567 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 3000);
568 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 2); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
570 let trailer = PS - SIZE_FIL_TRAILER;
571 BigEndian::write_u32(&mut page[trailer + 4..], 3000u32);
572 write_crc32c_checksum(&mut page);
573 page
574 }
575
576 fn write_ibd_file(tmpdir: &TempDir, db: &str, table: &str, pages: &[Vec<u8>]) {
577 let db_dir = tmpdir.path().join(db);
578 std::fs::create_dir_all(&db_dir).unwrap();
579 let ibd_path = db_dir.join(format!("{}.ibd", table));
580 let mut f = std::fs::File::create(ibd_path).unwrap();
581 for page in pages {
582 f.write_all(page).unwrap();
583 }
584 f.flush().unwrap();
585 }
586
587 #[test]
588 fn test_deep_validate_matching_space_id() {
589 let tmpdir = TempDir::new().unwrap();
590 let space_id = 42u32;
591 let page0 = build_page0(space_id, 4);
592 let page1 = build_index_page(1, space_id);
593 let page2 = build_index_page(2, space_id);
594 let page3 = build_index_page(3, space_id);
595 write_ibd_file(&tmpdir, "testdb", "users", &[page0, page1, page2, page3]);
596
597 let mapping = TablespaceMapping {
598 name: "testdb/users".to_string(),
599 space_id,
600 row_format: Some("Dynamic".to_string()),
601 };
602 let indexes = vec![
603 MysqlIndexInfo {
604 name: "PRIMARY".into(),
605 table_id: 100,
606 space_id,
607 page_no: Some(3),
608 },
609 MysqlIndexInfo {
610 name: "idx_email".into(),
611 table_id: 100,
612 space_id,
613 page_no: Some(2),
614 },
615 ];
616
617 let report = deep_validate_table(
618 tmpdir.path(),
619 "testdb/users",
620 &mapping,
621 &indexes,
622 Some(PAGE_SIZE),
623 false,
624 );
625 assert!(report.passed);
626 assert!(report.space_id_match);
627 assert_eq!(report.disk_space_id, Some(space_id));
628 assert_eq!(report.indexes_verified, 2);
629 }
630
631 #[test]
632 fn test_deep_validate_mismatched_space_id() {
633 let tmpdir = TempDir::new().unwrap();
634 let disk_sid = 42u32;
635 let mysql_sid = 99u32;
636 let page0 = build_page0(disk_sid, 3);
637 let page1 = build_index_page(1, disk_sid);
638 let page2 = build_index_page(2, disk_sid);
639 write_ibd_file(&tmpdir, "testdb", "orders", &[page0, page1, page2]);
640
641 let mapping = TablespaceMapping {
642 name: "testdb/orders".into(),
643 space_id: mysql_sid,
644 row_format: None,
645 };
646 let indexes = vec![MysqlIndexInfo {
647 name: "PRIMARY".into(),
648 table_id: 200,
649 space_id: mysql_sid,
650 page_no: Some(1),
651 }];
652
653 let report = deep_validate_table(
654 tmpdir.path(),
655 "testdb/orders",
656 &mapping,
657 &indexes,
658 Some(PAGE_SIZE),
659 false,
660 );
661 assert!(!report.passed);
662 assert!(!report.space_id_match);
663 assert_eq!(report.disk_space_id, Some(disk_sid));
664 assert!(report.indexes[0].root_page_valid);
665 }
666
667 #[test]
668 fn test_deep_validate_non_index_root_page() {
669 let tmpdir = TempDir::new().unwrap();
670 let space_id = 55u32;
671 let page0 = build_page0(space_id, 3);
672 let page1 = build_index_page(1, space_id);
673 let page2 = build_undo_page(2, space_id);
674 write_ibd_file(&tmpdir, "testdb", "items", &[page0, page1, page2]);
675
676 let mapping = TablespaceMapping {
677 name: "testdb/items".into(),
678 space_id,
679 row_format: None,
680 };
681 let indexes = vec![
682 MysqlIndexInfo {
683 name: "PRIMARY".into(),
684 table_id: 300,
685 space_id,
686 page_no: Some(1),
687 },
688 MysqlIndexInfo {
689 name: "idx_name".into(),
690 table_id: 300,
691 space_id,
692 page_no: Some(2),
693 },
694 ];
695
696 let report = deep_validate_table(
697 tmpdir.path(),
698 "testdb/items",
699 &mapping,
700 &indexes,
701 Some(PAGE_SIZE),
702 false,
703 );
704 assert!(!report.passed);
705 assert!(report.space_id_match);
706 assert_eq!(report.indexes_verified, 1);
707 assert!(report.indexes[0].root_page_valid);
708 assert!(!report.indexes[1].root_page_valid);
709 assert!(report.indexes[1]
710 .message
711 .as_ref()
712 .unwrap()
713 .contains("UNDO_LOG"));
714 }
715
716 #[test]
717 fn test_deep_validate_file_not_found() {
718 let tmpdir = TempDir::new().unwrap();
719 let mapping = TablespaceMapping {
720 name: "testdb/missing".into(),
721 space_id: 10,
722 row_format: None,
723 };
724 let indexes = vec![MysqlIndexInfo {
725 name: "PRIMARY".into(),
726 table_id: 400,
727 space_id: 10,
728 page_no: Some(3),
729 }];
730
731 let report = deep_validate_table(
732 tmpdir.path(),
733 "testdb/missing",
734 &mapping,
735 &indexes,
736 Some(PAGE_SIZE),
737 false,
738 );
739 assert!(!report.passed);
740 assert!(report.disk_space_id.is_none());
741 assert!(report.file_path.is_none());
742 }
743
744 #[test]
745 fn test_deep_validate_root_page_beyond_file() {
746 let tmpdir = TempDir::new().unwrap();
747 let space_id = 77u32;
748 let page0 = build_page0(space_id, 2);
749 let page1 = build_index_page(1, space_id);
750 write_ibd_file(&tmpdir, "testdb", "small", &[page0, page1]);
751
752 let mapping = TablespaceMapping {
753 name: "testdb/small".into(),
754 space_id,
755 row_format: None,
756 };
757 let indexes = vec![MysqlIndexInfo {
758 name: "PRIMARY".into(),
759 table_id: 500,
760 space_id,
761 page_no: Some(10),
762 }];
763
764 let report = deep_validate_table(
765 tmpdir.path(),
766 "testdb/small",
767 &mapping,
768 &indexes,
769 Some(PAGE_SIZE),
770 false,
771 );
772 assert!(!report.passed);
773 assert!(report.space_id_match);
774 assert!(!report.indexes[0].root_page_valid);
775 assert!(report.indexes[0]
776 .message
777 .as_ref()
778 .unwrap()
779 .contains("beyond file extent"));
780 }
781}