1use crate::parser::{
11 determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
12};
13use crate::progress::ProgressReader;
14use crate::schema::{Schema, SchemaBuilder, TableId};
15use crate::splitter::Compression;
16use ahash::{AHashMap, AHashSet};
17use serde::Serialize;
18use std::fmt;
19use std::fs::File;
20use std::hash::{Hash, Hasher};
21use std::io::Read;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25const MAX_ISSUES: usize = 1000;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
30#[serde(rename_all = "lowercase")]
31pub enum Severity {
32 Error,
33 Warning,
34 Info,
35}
36
37impl fmt::Display for Severity {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 Severity::Error => write!(f, "ERROR"),
41 Severity::Warning => write!(f, "WARNING"),
42 Severity::Info => write!(f, "INFO"),
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize)]
49pub struct Location {
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub table: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub statement_index: Option<u64>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub approx_line: Option<u64>,
56}
57
58impl Location {
59 pub fn new() -> Self {
60 Self {
61 table: None,
62 statement_index: None,
63 approx_line: None,
64 }
65 }
66
67 pub fn with_table(mut self, table: impl Into<String>) -> Self {
68 self.table = Some(table.into());
69 self
70 }
71
72 pub fn with_statement(mut self, index: u64) -> Self {
73 self.statement_index = Some(index);
74 self
75 }
76
77 #[allow(dead_code)]
78 pub fn with_line(mut self, line: u64) -> Self {
79 self.approx_line = Some(line);
80 self
81 }
82}
83
84impl Default for Location {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90#[derive(Debug, Clone, Serialize)]
92pub struct ValidationIssue {
93 pub code: &'static str,
94 pub severity: Severity,
95 pub message: String,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub location: Option<Location>,
98}
99
100impl ValidationIssue {
101 pub fn error(code: &'static str, message: impl Into<String>) -> Self {
102 Self {
103 code,
104 severity: Severity::Error,
105 message: message.into(),
106 location: None,
107 }
108 }
109
110 pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
111 Self {
112 code,
113 severity: Severity::Warning,
114 message: message.into(),
115 location: None,
116 }
117 }
118
119 pub fn info(code: &'static str, message: impl Into<String>) -> Self {
120 Self {
121 code,
122 severity: Severity::Info,
123 message: message.into(),
124 location: None,
125 }
126 }
127
128 pub fn with_location(mut self, location: Location) -> Self {
129 self.location = Some(location);
130 self
131 }
132}
133
134impl fmt::Display for ValidationIssue {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 write!(f, "{} [{}]", self.severity, self.code)?;
137 if let Some(ref loc) = self.location {
138 if let Some(ref table) = loc.table {
139 write!(f, " table={}", table)?;
140 }
141 if let Some(stmt) = loc.statement_index {
142 write!(f, " stmt={}", stmt)?;
143 }
144 if let Some(line) = loc.approx_line {
145 write!(f, " line~{}", line)?;
146 }
147 }
148 write!(f, ": {}", self.message)
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct ValidateOptions {
155 pub path: PathBuf,
156 pub dialect: Option<SqlDialect>,
157 pub progress: bool,
158 pub strict: bool,
159 pub json: bool,
160 pub max_rows_per_table: usize,
161 pub fk_checks_enabled: bool,
162 pub max_pk_fk_keys: Option<usize>,
166}
167
168#[derive(Debug, Serialize)]
170pub struct ValidationSummary {
171 pub dialect: String,
172 pub issues: Vec<ValidationIssue>,
173 pub summary: SummaryStats,
174 pub checks: CheckResults,
175}
176
177#[derive(Debug, Serialize)]
178pub struct SummaryStats {
179 pub errors: usize,
180 pub warnings: usize,
181 pub info: usize,
182 pub tables_scanned: usize,
183 pub statements_scanned: u64,
184}
185
186#[derive(Debug, Serialize)]
187pub struct CheckResults {
188 pub syntax: CheckStatus,
189 pub encoding: CheckStatus,
190 pub ddl_dml_consistency: CheckStatus,
191 pub pk_duplicates: CheckStatus,
192 pub fk_integrity: CheckStatus,
193}
194
195#[derive(Debug, Serialize)]
196#[serde(rename_all = "lowercase")]
197pub enum CheckStatus {
198 Ok,
199 Failed(usize),
200 Skipped(String),
201}
202
203impl fmt::Display for CheckStatus {
204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205 match self {
206 CheckStatus::Ok => write!(f, "OK"),
207 CheckStatus::Failed(n) => write!(f, "{} issues", n),
208 CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
209 }
210 }
211}
212
213impl ValidationSummary {
214 pub fn has_errors(&self) -> bool {
215 self.summary.errors > 0
216 }
217
218 pub fn has_warnings(&self) -> bool {
219 self.summary.warnings > 0
220 }
221}
222
223type PkHash = u64;
226
227fn hash_pk_values(values: &smallvec::SmallVec<[mysql_insert::PkValue; 2]>) -> PkHash {
230 let mut hasher = ahash::AHasher::default();
231
232 (values.len() as u8).hash(&mut hasher);
234
235 for v in values {
236 match v {
237 mysql_insert::PkValue::Int(i) => {
238 0u8.hash(&mut hasher);
239 i.hash(&mut hasher);
240 }
241 mysql_insert::PkValue::BigInt(i) => {
242 1u8.hash(&mut hasher);
243 i.hash(&mut hasher);
244 }
245 mysql_insert::PkValue::Text(s) => {
246 2u8.hash(&mut hasher);
247 s.hash(&mut hasher);
248 }
249 mysql_insert::PkValue::Null => {
250 3u8.hash(&mut hasher);
251 }
252 }
253 }
254
255 hasher.finish()
256}
257
258struct PendingFkCheck {
261 child_table_id: TableId,
262 parent_table_id: TableId,
263 fk_hash: PkHash,
264 stmt_idx: u64,
265}
266
267struct TableState {
270 row_count: u64,
271 pk_values: Option<AHashSet<PkHash>>,
274 pk_column_indices: Vec<usize>,
275 pk_duplicates: u64,
276 fk_missing_parents: u64,
277}
278
279impl TableState {
280 fn new() -> Self {
281 Self {
282 row_count: 0,
283 pk_values: Some(AHashSet::new()),
284 pk_column_indices: Vec::new(),
285 pk_duplicates: 0,
286 fk_missing_parents: 0,
287 }
288 }
289
290 fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
291 self.pk_column_indices = indices;
292 self
293 }
294}
295
296pub struct Validator {
298 options: ValidateOptions,
299 issues: Vec<ValidationIssue>,
300 dialect: SqlDialect,
301
302 tables_from_ddl: AHashSet<String>,
304 tables_from_dml: Vec<(String, u64)>, schema_builder: SchemaBuilder,
308 schema: Option<Schema>,
309
310 table_states: AHashMap<TableId, TableState>,
312
313 pending_fk_checks: Vec<PendingFkCheck>,
315
316 progress_fn: Option<Arc<dyn Fn(u64) + Send + Sync>>,
318
319 statement_count: u64,
321 syntax_errors: usize,
322 encoding_warnings: usize,
323 ddl_dml_errors: usize,
324 pk_errors: usize,
325 fk_errors: usize,
326
327 tracked_pk_count: usize,
329 tracked_fk_count: usize,
330 pk_fk_checks_disabled_due_to_memory: bool,
331
332 current_copy_context: Option<(String, Vec<String>, TableId)>,
334}
335
336impl Validator {
337 pub fn new(options: ValidateOptions) -> Self {
338 Self {
339 dialect: options.dialect.unwrap_or(SqlDialect::MySql),
340 options,
341 issues: Vec::new(),
342 tables_from_ddl: AHashSet::new(),
343 tables_from_dml: Vec::new(),
344 schema_builder: SchemaBuilder::new(),
345 schema: None,
346 table_states: AHashMap::new(),
347 pending_fk_checks: Vec::new(),
348 progress_fn: None,
349 statement_count: 0,
350 syntax_errors: 0,
351 encoding_warnings: 0,
352 ddl_dml_errors: 0,
353 pk_errors: 0,
354 fk_errors: 0,
355 tracked_pk_count: 0,
356 tracked_fk_count: 0,
357 pk_fk_checks_disabled_due_to_memory: false,
358 current_copy_context: None,
359 }
360 }
361
362 pub fn with_progress<F>(mut self, f: F) -> Self
365 where
366 F: Fn(u64) + Send + Sync + 'static,
367 {
368 self.progress_fn = Some(Arc::new(f));
369 self
370 }
371
372 fn add_issue(&mut self, issue: ValidationIssue) {
373 if self.issues.len() >= MAX_ISSUES {
374 return;
375 }
376
377 match issue.severity {
378 Severity::Error => match issue.code {
379 "SYNTAX" => self.syntax_errors += 1,
380 "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
381 "DUPLICATE_PK" => self.pk_errors += 1,
382 "FK_MISSING_PARENT" => self.fk_errors += 1,
383 _ => {}
384 },
385 Severity::Warning => {
386 if issue.code == "ENCODING" {
387 self.encoding_warnings += 1;
388 }
389 }
390 Severity::Info => {}
391 }
392
393 self.issues.push(issue);
394 }
395
396 fn enforce_pk_fk_memory_budget(&mut self) {
399 if self.pk_fk_checks_disabled_due_to_memory {
400 return;
401 }
402
403 let Some(limit) = self.options.max_pk_fk_keys else {
404 return;
405 };
406
407 let total_tracked = self.tracked_pk_count + self.tracked_fk_count;
408 if total_tracked > limit {
409 self.pk_fk_checks_disabled_due_to_memory = true;
410
411 for state in self.table_states.values_mut() {
413 state.pk_values = None;
414 }
415 self.pending_fk_checks.clear();
416 self.pending_fk_checks.shrink_to_fit();
417
418 self.add_issue(ValidationIssue::warning(
419 "PK_FK_CHECKS_SKIPPED_MEMORY",
420 format!(
421 "Skipping PK/FK checks after tracking {} keys (memory limit of {} exceeded)",
422 total_tracked, limit
423 ),
424 ));
425 }
426 }
427
428 pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
429 let file = File::open(&self.options.path)?;
430 let file_size = file.metadata()?.len();
431 let buffer_size = determine_buffer_size(file_size);
432
433 let compression = Compression::from_path(&self.options.path);
435 let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
436 let cb = Arc::clone(cb);
437 let progress_reader = ProgressReader::new(file, move |bytes| {
438 cb(bytes / 2)
440 });
441 compression.wrap_reader(Box::new(progress_reader))
442 } else {
443 compression.wrap_reader(Box::new(file))
444 };
445
446 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
447
448 loop {
450 match parser.read_statement() {
451 Ok(Some(stmt)) => {
452 self.statement_count += 1;
453 self.process_statement(&stmt);
454 }
455 Ok(None) => break,
456 Err(e) => {
457 self.add_issue(
458 ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
459 .with_location(
460 Location::new().with_statement(self.statement_count + 1),
461 ),
462 );
463 break;
464 }
465 }
466 }
467
468 let missing_table_issues: Vec<_> = self
470 .tables_from_dml
471 .iter()
472 .filter(|(table, _)| {
473 let table_lower = table.to_lowercase();
474 !self
475 .tables_from_ddl
476 .iter()
477 .any(|t| t.to_lowercase() == table_lower)
478 })
479 .map(|(table, stmt_idx)| {
480 ValidationIssue::error(
481 "DDL_MISSING_TABLE",
482 format!(
483 "INSERT/COPY references table '{}' with no CREATE TABLE",
484 table
485 ),
486 )
487 .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
488 })
489 .collect();
490
491 for issue in missing_table_issues {
492 self.add_issue(issue);
493 }
494
495 if self.options.fk_checks_enabled {
497 self.schema = Some(self.schema_builder.build());
498 self.schema_builder = SchemaBuilder::new(); self.initialize_table_states();
500 }
501
502 let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
504 if self.options.fk_checks_enabled && schema_not_empty {
505 self.run_data_checks()?;
506 self.validate_pending_fk_checks();
508 }
509
510 Ok(self.build_summary())
511 }
512
513 fn process_statement(&mut self, stmt: &[u8]) {
514 if std::str::from_utf8(stmt).is_err() {
516 self.add_issue(
517 ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
518 .with_location(Location::new().with_statement(self.statement_count)),
519 );
520 }
521
522 let (stmt_type, table_name) =
523 Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
524
525 match stmt_type {
526 StatementType::CreateTable => {
527 if !table_name.is_empty() {
528 self.tables_from_ddl.insert(table_name.clone());
529
530 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
532 self.schema_builder.parse_create_table(stmt_str);
533 }
534 }
535 }
536 StatementType::AlterTable => {
537 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
539 self.schema_builder.parse_alter_table(stmt_str);
540 }
541 }
542 StatementType::Insert | StatementType::Copy => {
543 if !table_name.is_empty() {
544 self.tables_from_dml
545 .push((table_name, self.statement_count));
546 }
547 }
548 StatementType::Unknown => {
549 }
551 _ => {}
552 }
553 }
554
555 fn initialize_table_states(&mut self) {
556 let schema = match &self.schema {
557 Some(s) => s,
558 None => return,
559 };
560
561 for table_schema in schema.iter() {
562 let pk_indices: Vec<usize> = table_schema
563 .primary_key
564 .iter()
565 .map(|col_id| col_id.0 as usize)
566 .collect();
567
568 let state = TableState::new().with_pk_columns(pk_indices);
569 self.table_states.insert(table_schema.id, state);
570 }
571 }
572
573 fn run_data_checks(&mut self) -> anyhow::Result<()> {
574 let file = File::open(&self.options.path)?;
575 let file_size = file.metadata()?.len();
576 let buffer_size = determine_buffer_size(file_size);
577
578 let compression = Compression::from_path(&self.options.path);
580 let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
581 let cb = Arc::clone(cb);
582 let progress_reader = ProgressReader::new(file, move |bytes| {
583 cb(file_size / 2 + bytes / 2)
585 });
586 compression.wrap_reader(Box::new(progress_reader))
587 } else {
588 compression.wrap_reader(Box::new(file))
589 };
590
591 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
592 let mut stmt_count: u64 = 0;
593
594 self.current_copy_context = None;
596
597 while let Some(stmt) = parser.read_statement()? {
598 stmt_count += 1;
599
600 let (stmt_type, table_name) =
601 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
602
603 if self.dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
605 if stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n") {
607 if let Some((ref copy_table, ref column_order, copy_table_id)) =
608 self.current_copy_context.clone()
609 {
610 self.check_copy_data(
611 &stmt,
612 copy_table_id,
613 copy_table,
614 column_order.clone(),
615 stmt_count,
616 );
617 }
618 }
619 self.current_copy_context = None;
620 continue;
621 }
622
623 let table_id = match &self.schema {
625 Some(s) => match s.get_table_id(&table_name) {
626 Some(id) => id,
627 None => continue,
628 },
629 None => continue,
630 };
631
632 match stmt_type {
633 StatementType::Insert => {
634 self.check_insert_statement(&stmt, table_id, &table_name, stmt_count);
636 }
637 StatementType::Copy => {
638 let header = String::from_utf8_lossy(&stmt);
641 let column_order = postgres_copy::parse_copy_columns(&header);
642 self.current_copy_context = Some((table_name.clone(), column_order, table_id));
643 }
644 _ => continue,
645 }
646 }
647
648 Ok(())
649 }
650
651 fn check_insert_statement(
653 &mut self,
654 stmt: &[u8],
655 table_id: TableId,
656 table_name: &str,
657 stmt_count: u64,
658 ) {
659 let table_schema = match &self.schema {
660 Some(s) => match s.table(table_id) {
661 Some(ts) => ts,
662 None => return,
663 },
664 None => return,
665 };
666
667 let rows = match mysql_insert::parse_mysql_insert_rows(stmt, table_schema) {
669 Ok(r) => r,
670 Err(_) => return,
671 };
672
673 for row in rows {
674 self.check_mysql_row(table_id, table_name, &row, stmt_count);
675 }
676 }
677
678 fn check_copy_statement(
680 &mut self,
681 stmt: &[u8],
682 table_id: TableId,
683 table_name: &str,
684 stmt_count: u64,
685 ) {
686 let stmt_str = match std::str::from_utf8(stmt) {
688 Ok(s) => s,
689 Err(_) => return,
690 };
691
692 let data_start = if let Some(pos) = stmt_str.find("FROM stdin;") {
694 pos + "FROM stdin;".len()
695 } else if let Some(pos) = stmt_str.find("from stdin;") {
696 pos + "from stdin;".len()
697 } else {
698 return;
699 };
700
701 let data_section = stmt_str[data_start..].trim_start();
703 if data_section.is_empty() {
704 return;
705 }
706
707 let header = &stmt_str[..data_start];
709 let column_order = postgres_copy::parse_copy_columns(header);
710
711 let table_schema = match &self.schema {
713 Some(s) => match s.table(table_id) {
714 Some(ts) => ts,
715 None => return,
716 },
717 None => return,
718 };
719
720 let rows = match postgres_copy::parse_postgres_copy_rows(
722 data_section.as_bytes(),
723 table_schema,
724 column_order,
725 ) {
726 Ok(r) => r,
727 Err(_) => return,
728 };
729
730 for row in rows {
731 self.check_copy_row(table_id, table_name, &row, stmt_count);
732 }
733 }
734
735 fn check_copy_data(
737 &mut self,
738 data_stmt: &[u8],
739 table_id: TableId,
740 table_name: &str,
741 column_order: Vec<String>,
742 stmt_count: u64,
743 ) {
744 let data: Vec<u8> = data_stmt
747 .iter()
748 .skip_while(|&&b| b == b'\n' || b == b'\r' || b == b' ' || b == b'\t')
749 .cloned()
750 .collect();
751
752 if data.is_empty() {
753 return;
754 }
755
756 let table_schema = match &self.schema {
758 Some(s) => match s.table(table_id) {
759 Some(ts) => ts,
760 None => return,
761 },
762 None => return,
763 };
764
765 let rows = match postgres_copy::parse_postgres_copy_rows(&data, table_schema, column_order)
767 {
768 Ok(r) => r,
769 Err(_) => return,
770 };
771
772 for row in rows {
773 self.check_copy_row(table_id, table_name, &row, stmt_count);
774 }
775 }
776
777 fn check_mysql_row(
779 &mut self,
780 table_id: TableId,
781 table_name: &str,
782 row: &mysql_insert::ParsedRow,
783 stmt_idx: u64,
784 ) {
785 self.check_row_common(
786 table_id,
787 table_name,
788 row.pk.as_ref(),
789 &row.fk_values,
790 stmt_idx,
791 );
792 }
793
794 fn check_copy_row(
796 &mut self,
797 table_id: TableId,
798 table_name: &str,
799 row: &postgres_copy::ParsedCopyRow,
800 stmt_idx: u64,
801 ) {
802 self.check_row_common(
803 table_id,
804 table_name,
805 row.pk.as_ref(),
806 &row.fk_values,
807 stmt_idx,
808 );
809 }
810
811 fn check_row_common(
813 &mut self,
814 table_id: TableId,
815 table_name: &str,
816 pk: Option<&smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
817 fk_values: &[(
818 mysql_insert::FkRef,
819 smallvec::SmallVec<[mysql_insert::PkValue; 2]>,
820 )],
821 stmt_idx: u64,
822 ) {
823 if self.pk_fk_checks_disabled_due_to_memory {
825 return;
826 }
827
828 let max_rows = self.options.max_rows_per_table as u64;
829
830 let state = match self.table_states.get_mut(&table_id) {
831 Some(s) => s,
832 None => return,
833 };
834
835 state.row_count += 1;
836
837 if state.row_count > max_rows {
839 if state.pk_values.is_some() {
840 state.pk_values = None;
841 self.add_issue(
842 ValidationIssue::warning(
843 "PK_CHECK_SKIPPED",
844 format!(
845 "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
846 table_name, max_rows
847 ),
848 )
849 .with_location(Location::new().with_table(table_name)),
850 );
851 }
852 return;
853 }
854
855 if let Some(pk_values) = pk {
857 if let Some(ref mut pk_set) = state.pk_values {
858 let pk_hash = hash_pk_values(pk_values);
859
860 if pk_set.insert(pk_hash) {
861 self.tracked_pk_count += 1;
863 self.enforce_pk_fk_memory_budget();
864 } else {
865 state.pk_duplicates += 1;
867
868 let pk_display: String = pk_values
870 .iter()
871 .map(|v| match v {
872 mysql_insert::PkValue::Int(i) => i.to_string(),
873 mysql_insert::PkValue::BigInt(i) => i.to_string(),
874 mysql_insert::PkValue::Text(s) => s.to_string(),
875 mysql_insert::PkValue::Null => "NULL".to_string(),
876 })
877 .collect::<Vec<_>>()
878 .join(", ");
879
880 self.add_issue(
881 ValidationIssue::error(
882 "DUPLICATE_PK",
883 format!(
884 "Duplicate primary key in table '{}': ({})",
885 table_name, pk_display
886 ),
887 )
888 .with_location(
889 Location::new()
890 .with_table(table_name)
891 .with_statement(stmt_idx),
892 ),
893 );
894 }
895 }
896 }
897
898 if self.pk_fk_checks_disabled_due_to_memory {
900 return;
901 }
902
903 let new_fk_checks: Vec<PendingFkCheck> = {
906 let schema = match &self.schema {
907 Some(s) => s,
908 None => return,
909 };
910
911 let table_schema = match schema.table(table_id) {
912 Some(t) => t,
913 None => return,
914 };
915
916 fk_values
917 .iter()
918 .filter_map(|(fk_ref, fk_vals)| {
919 if fk_vals.iter().all(|v| v.is_null()) {
921 return None;
922 }
923
924 let fk_def = table_schema.foreign_keys.get(fk_ref.fk_index as usize)?;
925 let parent_table_id = fk_def.referenced_table_id?;
926
927 let fk_hash = hash_pk_values(fk_vals);
929
930 Some(PendingFkCheck {
931 child_table_id: table_id,
932 parent_table_id,
933 fk_hash,
934 stmt_idx,
935 })
936 })
937 .collect()
938 };
939
940 let new_count = new_fk_checks.len();
942 self.pending_fk_checks.extend(new_fk_checks);
943 self.tracked_fk_count += new_count;
944
945 if new_count > 0 {
946 self.enforce_pk_fk_memory_budget();
947 }
948 }
949
950 fn validate_pending_fk_checks(&mut self) {
952 for check in std::mem::take(&mut self.pending_fk_checks) {
953 let parent_has_pk = self
954 .table_states
955 .get(&check.parent_table_id)
956 .and_then(|s| s.pk_values.as_ref())
957 .is_some_and(|set| set.contains(&check.fk_hash));
958
959 if !parent_has_pk {
960 let state = match self.table_states.get_mut(&check.child_table_id) {
961 Some(s) => s,
962 None => continue,
963 };
964 state.fk_missing_parents += 1;
965
966 if state.fk_missing_parents <= 5 {
968 let (child_name, parent_name) = if let Some(schema) = &self.schema {
970 let child = schema
971 .table(check.child_table_id)
972 .map(|t| t.name.clone())
973 .unwrap_or_else(|| "<unknown>".to_string());
974 let parent = schema
975 .table(check.parent_table_id)
976 .map(|t| t.name.clone())
977 .unwrap_or_else(|| "<unknown>".to_string());
978 (child, parent)
979 } else {
980 ("<unknown>".to_string(), "<unknown>".to_string())
981 };
982
983 self.add_issue(
984 ValidationIssue::error(
985 "FK_MISSING_PARENT",
986 format!(
987 "FK violation in '{}': references missing row in '{}'",
988 child_name, parent_name
989 ),
990 )
991 .with_location(
992 Location::new()
993 .with_table(child_name)
994 .with_statement(check.stmt_idx),
995 ),
996 );
997 }
998 }
999 }
1000 }
1001
1002 fn build_summary(&self) -> ValidationSummary {
1003 let errors = self
1004 .issues
1005 .iter()
1006 .filter(|i| matches!(i.severity, Severity::Error))
1007 .count();
1008 let warnings = self
1009 .issues
1010 .iter()
1011 .filter(|i| matches!(i.severity, Severity::Warning))
1012 .count();
1013 let info = self
1014 .issues
1015 .iter()
1016 .filter(|i| matches!(i.severity, Severity::Info))
1017 .count();
1018
1019 let syntax_status = if self.syntax_errors > 0 {
1020 CheckStatus::Failed(self.syntax_errors)
1021 } else {
1022 CheckStatus::Ok
1023 };
1024
1025 let encoding_status = if self.encoding_warnings > 0 {
1026 CheckStatus::Failed(self.encoding_warnings)
1027 } else {
1028 CheckStatus::Ok
1029 };
1030
1031 let ddl_dml_status = if self.ddl_dml_errors > 0 {
1032 CheckStatus::Failed(self.ddl_dml_errors)
1033 } else {
1034 CheckStatus::Ok
1035 };
1036
1037 let pk_status = if !self.options.fk_checks_enabled {
1038 CheckStatus::Skipped("--no-fk-checks".to_string())
1039 } else if self.pk_fk_checks_disabled_due_to_memory {
1040 CheckStatus::Skipped("memory limit exceeded".to_string())
1041 } else if self.pk_errors > 0 {
1042 CheckStatus::Failed(self.pk_errors)
1043 } else {
1044 CheckStatus::Ok
1045 };
1046
1047 let fk_status = if !self.options.fk_checks_enabled {
1048 CheckStatus::Skipped("--no-fk-checks".to_string())
1049 } else if self.pk_fk_checks_disabled_due_to_memory {
1050 CheckStatus::Skipped("memory limit exceeded".to_string())
1051 } else if self.fk_errors > 0 {
1052 CheckStatus::Failed(self.fk_errors)
1053 } else {
1054 CheckStatus::Ok
1055 };
1056
1057 ValidationSummary {
1058 dialect: self.dialect.to_string(),
1059 issues: self.issues.clone(),
1060 summary: SummaryStats {
1061 errors,
1062 warnings,
1063 info,
1064 tables_scanned: self.tables_from_ddl.len(),
1065 statements_scanned: self.statement_count,
1066 },
1067 checks: CheckResults {
1068 syntax: syntax_status,
1069 encoding: encoding_status,
1070 ddl_dml_consistency: ddl_dml_status,
1071 pk_duplicates: pk_status,
1072 fk_integrity: fk_status,
1073 },
1074 }
1075 }
1076}