1use crate::parser::{
11 determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
12};
13use crate::progress::ProgressReader;
14use crate::schema::{Schema, SchemaBuilder, TableId};
15use crate::splitter::Compression;
16use ahash::{AHashMap, AHashSet};
17use schemars::JsonSchema;
18use serde::Serialize;
19use std::fmt;
20use std::fs::File;
21use std::hash::{Hash, Hasher};
22use std::io::Read;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26const MAX_ISSUES: usize = 1000;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, JsonSchema)]
31#[serde(rename_all = "lowercase")]
32pub enum Severity {
33 Error,
34 Warning,
35 Info,
36}
37
38impl fmt::Display for Severity {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 Severity::Error => write!(f, "ERROR"),
42 Severity::Warning => write!(f, "WARNING"),
43 Severity::Info => write!(f, "INFO"),
44 }
45 }
46}
47
48#[derive(Debug, Clone, Serialize, JsonSchema)]
50pub struct Location {
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub table: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub statement_index: Option<u64>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub approx_line: Option<u64>,
57}
58
59impl Location {
60 pub fn new() -> Self {
61 Self {
62 table: None,
63 statement_index: None,
64 approx_line: None,
65 }
66 }
67
68 pub fn with_table(mut self, table: impl Into<String>) -> Self {
69 self.table = Some(table.into());
70 self
71 }
72
73 pub fn with_statement(mut self, index: u64) -> Self {
74 self.statement_index = Some(index);
75 self
76 }
77
78 #[allow(dead_code)]
79 pub fn with_line(mut self, line: u64) -> Self {
80 self.approx_line = Some(line);
81 self
82 }
83}
84
85impl Default for Location {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91#[derive(Debug, Clone, Serialize, JsonSchema)]
93pub struct ValidationIssue {
94 pub code: &'static str,
95 pub severity: Severity,
96 pub message: String,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub location: Option<Location>,
99}
100
101impl ValidationIssue {
102 pub fn error(code: &'static str, message: impl Into<String>) -> Self {
103 Self {
104 code,
105 severity: Severity::Error,
106 message: message.into(),
107 location: None,
108 }
109 }
110
111 pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
112 Self {
113 code,
114 severity: Severity::Warning,
115 message: message.into(),
116 location: None,
117 }
118 }
119
120 pub fn info(code: &'static str, message: impl Into<String>) -> Self {
121 Self {
122 code,
123 severity: Severity::Info,
124 message: message.into(),
125 location: None,
126 }
127 }
128
129 pub fn with_location(mut self, location: Location) -> Self {
130 self.location = Some(location);
131 self
132 }
133}
134
135impl fmt::Display for ValidationIssue {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 write!(f, "{} [{}]", self.severity, self.code)?;
138 if let Some(ref loc) = self.location {
139 if let Some(ref table) = loc.table {
140 write!(f, " table={}", table)?;
141 }
142 if let Some(stmt) = loc.statement_index {
143 write!(f, " stmt={}", stmt)?;
144 }
145 if let Some(line) = loc.approx_line {
146 write!(f, " line~{}", line)?;
147 }
148 }
149 write!(f, ": {}", self.message)
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct ValidateOptions {
156 pub path: PathBuf,
157 pub dialect: Option<SqlDialect>,
158 pub progress: bool,
159 pub strict: bool,
160 pub json: bool,
161 pub max_rows_per_table: usize,
162 pub fk_checks_enabled: bool,
163 pub max_pk_fk_keys: Option<usize>,
167}
168
169#[derive(Debug, Serialize, JsonSchema)]
171pub struct ValidationSummary {
172 pub dialect: String,
173 pub issues: Vec<ValidationIssue>,
174 pub summary: SummaryStats,
175 pub checks: CheckResults,
176}
177
178#[derive(Debug, Serialize, JsonSchema)]
179pub struct SummaryStats {
180 pub errors: usize,
181 pub warnings: usize,
182 pub info: usize,
183 pub tables_scanned: usize,
184 pub statements_scanned: u64,
185}
186
187#[derive(Debug, Serialize, JsonSchema)]
188pub struct CheckResults {
189 pub syntax: CheckStatus,
190 pub encoding: CheckStatus,
191 pub ddl_dml_consistency: CheckStatus,
192 pub pk_duplicates: CheckStatus,
193 pub fk_integrity: CheckStatus,
194}
195
196#[derive(Debug, Serialize, JsonSchema)]
197#[serde(rename_all = "lowercase")]
198pub enum CheckStatus {
199 Ok,
200 Failed(usize),
201 Skipped(String),
202}
203
204impl fmt::Display for CheckStatus {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 match self {
207 CheckStatus::Ok => write!(f, "OK"),
208 CheckStatus::Failed(n) => write!(f, "{} issues", n),
209 CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
210 }
211 }
212}
213
214impl ValidationSummary {
215 pub fn has_errors(&self) -> bool {
216 self.summary.errors > 0
217 }
218
219 pub fn has_warnings(&self) -> bool {
220 self.summary.warnings > 0
221 }
222}
223
224type PkHash = u64;
227
228fn hash_pk_values(values: &smallvec::SmallVec<[mysql_insert::PkValue; 2]>) -> PkHash {
231 let mut hasher = ahash::AHasher::default();
232
233 (values.len() as u8).hash(&mut hasher);
235
236 for v in values {
237 match v {
238 mysql_insert::PkValue::Int(i) => {
239 0u8.hash(&mut hasher);
240 i.hash(&mut hasher);
241 }
242 mysql_insert::PkValue::BigInt(i) => {
243 1u8.hash(&mut hasher);
244 i.hash(&mut hasher);
245 }
246 mysql_insert::PkValue::Text(s) => {
247 2u8.hash(&mut hasher);
248 s.hash(&mut hasher);
249 }
250 mysql_insert::PkValue::Null => {
251 3u8.hash(&mut hasher);
252 }
253 }
254 }
255
256 hasher.finish()
257}
258
259struct PendingFkCheck {
262 child_table_id: TableId,
263 parent_table_id: TableId,
264 fk_hash: PkHash,
265 stmt_idx: u64,
266}
267
268struct TableState {
271 row_count: u64,
272 pk_values: Option<AHashSet<PkHash>>,
275 pk_column_indices: Vec<usize>,
276 pk_duplicates: u64,
277 fk_missing_parents: u64,
278}
279
280impl TableState {
281 fn new() -> Self {
282 Self {
283 row_count: 0,
284 pk_values: Some(AHashSet::new()),
285 pk_column_indices: Vec::new(),
286 pk_duplicates: 0,
287 fk_missing_parents: 0,
288 }
289 }
290
291 fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
292 self.pk_column_indices = indices;
293 self
294 }
295}
296
297pub struct Validator {
299 options: ValidateOptions,
300 issues: Vec<ValidationIssue>,
301 dialect: SqlDialect,
302
303 tables_from_ddl: AHashSet<String>,
305 tables_from_dml: Vec<(String, u64)>, schema_builder: SchemaBuilder,
309 schema: Option<Schema>,
310
311 table_states: AHashMap<TableId, TableState>,
313
314 pending_fk_checks: Vec<PendingFkCheck>,
316
317 progress_fn: Option<Arc<dyn Fn(u64) + Send + Sync>>,
319
320 statement_count: u64,
322 syntax_errors: usize,
323 encoding_warnings: usize,
324 ddl_dml_errors: usize,
325 pk_errors: usize,
326 fk_errors: usize,
327
328 tracked_pk_count: usize,
330 tracked_fk_count: usize,
331 pk_fk_checks_disabled_due_to_memory: bool,
332
333 current_copy_context: Option<(String, Vec<String>, TableId)>,
335}
336
337impl Validator {
338 pub fn new(options: ValidateOptions) -> Self {
339 Self {
340 dialect: options.dialect.unwrap_or(SqlDialect::MySql),
341 options,
342 issues: Vec::new(),
343 tables_from_ddl: AHashSet::new(),
344 tables_from_dml: Vec::new(),
345 schema_builder: SchemaBuilder::new(),
346 schema: None,
347 table_states: AHashMap::new(),
348 pending_fk_checks: Vec::new(),
349 progress_fn: None,
350 statement_count: 0,
351 syntax_errors: 0,
352 encoding_warnings: 0,
353 ddl_dml_errors: 0,
354 pk_errors: 0,
355 fk_errors: 0,
356 tracked_pk_count: 0,
357 tracked_fk_count: 0,
358 pk_fk_checks_disabled_due_to_memory: false,
359 current_copy_context: None,
360 }
361 }
362
363 pub fn with_progress<F>(mut self, f: F) -> Self
366 where
367 F: Fn(u64) + Send + Sync + 'static,
368 {
369 self.progress_fn = Some(Arc::new(f));
370 self
371 }
372
373 fn add_issue(&mut self, issue: ValidationIssue) {
374 if self.issues.len() >= MAX_ISSUES {
375 return;
376 }
377
378 match issue.severity {
379 Severity::Error => match issue.code {
380 "SYNTAX" => self.syntax_errors += 1,
381 "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
382 "DUPLICATE_PK" => self.pk_errors += 1,
383 "FK_MISSING_PARENT" => self.fk_errors += 1,
384 _ => {}
385 },
386 Severity::Warning => {
387 if issue.code == "ENCODING" {
388 self.encoding_warnings += 1;
389 }
390 }
391 Severity::Info => {}
392 }
393
394 self.issues.push(issue);
395 }
396
397 fn enforce_pk_fk_memory_budget(&mut self) {
400 if self.pk_fk_checks_disabled_due_to_memory {
401 return;
402 }
403
404 let Some(limit) = self.options.max_pk_fk_keys else {
405 return;
406 };
407
408 let total_tracked = self.tracked_pk_count + self.tracked_fk_count;
409 if total_tracked > limit {
410 self.pk_fk_checks_disabled_due_to_memory = true;
411
412 for state in self.table_states.values_mut() {
414 state.pk_values = None;
415 }
416 self.pending_fk_checks.clear();
417 self.pending_fk_checks.shrink_to_fit();
418
419 self.add_issue(ValidationIssue::warning(
420 "PK_FK_CHECKS_SKIPPED_MEMORY",
421 format!(
422 "Skipping PK/FK checks after tracking {} keys (memory limit of {} exceeded)",
423 total_tracked, limit
424 ),
425 ));
426 }
427 }
428
429 pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
430 let file = File::open(&self.options.path)?;
431 let file_size = file.metadata()?.len();
432 let buffer_size = determine_buffer_size(file_size);
433
434 let compression = Compression::from_path(&self.options.path);
436 let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
437 let cb = Arc::clone(cb);
438 let progress_reader = ProgressReader::new(file, move |bytes| {
439 cb(bytes / 2)
441 });
442 compression.wrap_reader(Box::new(progress_reader))?
443 } else {
444 compression.wrap_reader(Box::new(file))?
445 };
446
447 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
448
449 loop {
451 match parser.read_statement() {
452 Ok(Some(stmt)) => {
453 self.statement_count += 1;
454 self.process_statement(&stmt);
455 }
456 Ok(None) => break,
457 Err(e) => {
458 self.add_issue(
459 ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
460 .with_location(
461 Location::new().with_statement(self.statement_count + 1),
462 ),
463 );
464 break;
465 }
466 }
467 }
468
469 let missing_table_issues: Vec<_> = self
471 .tables_from_dml
472 .iter()
473 .filter(|(table, _)| {
474 let table_lower = table.to_lowercase();
475 !self
476 .tables_from_ddl
477 .iter()
478 .any(|t| t.to_lowercase() == table_lower)
479 })
480 .map(|(table, stmt_idx)| {
481 ValidationIssue::error(
482 "DDL_MISSING_TABLE",
483 format!(
484 "INSERT/COPY references table '{}' with no CREATE TABLE",
485 table
486 ),
487 )
488 .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
489 })
490 .collect();
491
492 for issue in missing_table_issues {
493 self.add_issue(issue);
494 }
495
496 if self.options.fk_checks_enabled {
498 self.schema = Some(self.schema_builder.build());
499 self.schema_builder = SchemaBuilder::new(); self.initialize_table_states();
501 }
502
503 let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
505 if self.options.fk_checks_enabled && schema_not_empty {
506 self.run_data_checks()?;
507 self.validate_pending_fk_checks();
509 }
510
511 Ok(self.build_summary())
512 }
513
514 fn process_statement(&mut self, stmt: &[u8]) {
515 if std::str::from_utf8(stmt).is_err() {
517 self.add_issue(
518 ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
519 .with_location(Location::new().with_statement(self.statement_count)),
520 );
521 }
522
523 let (stmt_type, table_name) =
524 Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
525
526 match stmt_type {
527 StatementType::CreateTable => {
528 if !table_name.is_empty() {
529 self.tables_from_ddl.insert(table_name.clone());
530
531 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
533 self.schema_builder.parse_create_table(stmt_str);
534 }
535 }
536 }
537 StatementType::AlterTable => {
538 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
540 self.schema_builder.parse_alter_table(stmt_str);
541 }
542 }
543 StatementType::Insert | StatementType::Copy => {
544 if !table_name.is_empty() {
545 self.tables_from_dml
546 .push((table_name, self.statement_count));
547 }
548 }
549 StatementType::Unknown => {
550 }
552 _ => {}
553 }
554 }
555
556 fn initialize_table_states(&mut self) {
557 let schema = match &self.schema {
558 Some(s) => s,
559 None => return,
560 };
561
562 for table_schema in schema.iter() {
563 let pk_indices: Vec<usize> = table_schema
564 .primary_key
565 .iter()
566 .map(|col_id| col_id.0 as usize)
567 .collect();
568
569 let state = TableState::new().with_pk_columns(pk_indices);
570 self.table_states.insert(table_schema.id, state);
571 }
572 }
573
574 fn run_data_checks(&mut self) -> anyhow::Result<()> {
575 let file = File::open(&self.options.path)?;
576 let file_size = file.metadata()?.len();
577 let buffer_size = determine_buffer_size(file_size);
578
579 let compression = Compression::from_path(&self.options.path);
581 let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
582 let cb = Arc::clone(cb);
583 let progress_reader = ProgressReader::new(file, move |bytes| {
584 cb(file_size / 2 + bytes / 2)
586 });
587 compression.wrap_reader(Box::new(progress_reader))?
588 } else {
589 compression.wrap_reader(Box::new(file))?
590 };
591
592 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
593 let mut stmt_count: u64 = 0;
594
595 self.current_copy_context = None;
597
598 while let Some(stmt) = parser.read_statement()? {
599 stmt_count += 1;
600
601 let (stmt_type, table_name) =
602 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
603
604 if self.dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
606 if stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n") {
608 if let Some((ref copy_table, ref column_order, copy_table_id)) =
609 self.current_copy_context.clone()
610 {
611 self.check_copy_data(
612 &stmt,
613 copy_table_id,
614 copy_table,
615 column_order.clone(),
616 stmt_count,
617 );
618 }
619 }
620 self.current_copy_context = None;
621 continue;
622 }
623
624 let table_id = match &self.schema {
626 Some(s) => match s.get_table_id(&table_name) {
627 Some(id) => id,
628 None => continue,
629 },
630 None => continue,
631 };
632
633 match stmt_type {
634 StatementType::Insert => {
635 self.check_insert_statement(&stmt, table_id, &table_name, stmt_count);
637 }
638 StatementType::Copy => {
639 let header = String::from_utf8_lossy(&stmt);
642 let column_order = postgres_copy::parse_copy_columns(&header);
643 self.current_copy_context = Some((table_name.clone(), column_order, table_id));
644 }
645 _ => continue,
646 }
647 }
648
649 Ok(())
650 }
651
652 fn check_insert_statement(
654 &mut self,
655 stmt: &[u8],
656 table_id: TableId,
657 table_name: &str,
658 stmt_count: u64,
659 ) {
660 let table_schema = match &self.schema {
661 Some(s) => match s.table(table_id) {
662 Some(ts) => ts,
663 None => return,
664 },
665 None => return,
666 };
667
668 let rows = match mysql_insert::parse_mysql_insert_rows(stmt, table_schema) {
670 Ok(r) => r,
671 Err(_) => return,
672 };
673
674 for row in rows {
675 self.check_mysql_row(table_id, table_name, &row, stmt_count);
676 }
677 }
678
679 fn check_copy_statement(
681 &mut self,
682 stmt: &[u8],
683 table_id: TableId,
684 table_name: &str,
685 stmt_count: u64,
686 ) {
687 let stmt_str = match std::str::from_utf8(stmt) {
689 Ok(s) => s,
690 Err(_) => return,
691 };
692
693 let data_start = if let Some(pos) = stmt_str.find("FROM stdin;") {
695 pos + "FROM stdin;".len()
696 } else if let Some(pos) = stmt_str.find("from stdin;") {
697 pos + "from stdin;".len()
698 } else {
699 return;
700 };
701
702 let data_section = stmt_str[data_start..].trim_start();
704 if data_section.is_empty() {
705 return;
706 }
707
708 let header = &stmt_str[..data_start];
710 let column_order = postgres_copy::parse_copy_columns(header);
711
712 let table_schema = match &self.schema {
714 Some(s) => match s.table(table_id) {
715 Some(ts) => ts,
716 None => return,
717 },
718 None => return,
719 };
720
721 let rows = match postgres_copy::parse_postgres_copy_rows(
723 data_section.as_bytes(),
724 table_schema,
725 column_order,
726 ) {
727 Ok(r) => r,
728 Err(_) => return,
729 };
730
731 for row in rows {
732 self.check_copy_row(table_id, table_name, &row, stmt_count);
733 }
734 }
735
736 fn check_copy_data(
738 &mut self,
739 data_stmt: &[u8],
740 table_id: TableId,
741 table_name: &str,
742 column_order: Vec<String>,
743 stmt_count: u64,
744 ) {
745 let data: Vec<u8> = data_stmt
748 .iter()
749 .skip_while(|&&b| b == b'\n' || b == b'\r' || b == b' ' || b == b'\t')
750 .cloned()
751 .collect();
752
753 if data.is_empty() {
754 return;
755 }
756
757 let table_schema = match &self.schema {
759 Some(s) => match s.table(table_id) {
760 Some(ts) => ts,
761 None => return,
762 },
763 None => return,
764 };
765
766 let rows = match postgres_copy::parse_postgres_copy_rows(&data, table_schema, column_order)
768 {
769 Ok(r) => r,
770 Err(_) => return,
771 };
772
773 for row in rows {
774 self.check_copy_row(table_id, table_name, &row, stmt_count);
775 }
776 }
777
778 fn check_mysql_row(
780 &mut self,
781 table_id: TableId,
782 table_name: &str,
783 row: &mysql_insert::ParsedRow,
784 stmt_idx: u64,
785 ) {
786 self.check_row_common(
787 table_id,
788 table_name,
789 row.pk.as_ref(),
790 &row.fk_values,
791 stmt_idx,
792 );
793 }
794
795 fn check_copy_row(
797 &mut self,
798 table_id: TableId,
799 table_name: &str,
800 row: &postgres_copy::ParsedCopyRow,
801 stmt_idx: u64,
802 ) {
803 self.check_row_common(
804 table_id,
805 table_name,
806 row.pk.as_ref(),
807 &row.fk_values,
808 stmt_idx,
809 );
810 }
811
812 fn check_row_common(
814 &mut self,
815 table_id: TableId,
816 table_name: &str,
817 pk: Option<&smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
818 fk_values: &[(
819 mysql_insert::FkRef,
820 smallvec::SmallVec<[mysql_insert::PkValue; 2]>,
821 )],
822 stmt_idx: u64,
823 ) {
824 if self.pk_fk_checks_disabled_due_to_memory {
826 return;
827 }
828
829 let max_rows = self.options.max_rows_per_table as u64;
830
831 let state = match self.table_states.get_mut(&table_id) {
832 Some(s) => s,
833 None => return,
834 };
835
836 state.row_count += 1;
837
838 if state.row_count > max_rows {
840 if state.pk_values.is_some() {
841 state.pk_values = None;
842 self.add_issue(
843 ValidationIssue::warning(
844 "PK_CHECK_SKIPPED",
845 format!(
846 "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
847 table_name, max_rows
848 ),
849 )
850 .with_location(Location::new().with_table(table_name)),
851 );
852 }
853 return;
854 }
855
856 if let Some(pk_values) = pk {
858 if let Some(ref mut pk_set) = state.pk_values {
859 let pk_hash = hash_pk_values(pk_values);
860
861 if pk_set.insert(pk_hash) {
862 self.tracked_pk_count += 1;
864 self.enforce_pk_fk_memory_budget();
865 } else {
866 state.pk_duplicates += 1;
868
869 let pk_display: String = pk_values
871 .iter()
872 .map(|v| match v {
873 mysql_insert::PkValue::Int(i) => i.to_string(),
874 mysql_insert::PkValue::BigInt(i) => i.to_string(),
875 mysql_insert::PkValue::Text(s) => s.to_string(),
876 mysql_insert::PkValue::Null => "NULL".to_string(),
877 })
878 .collect::<Vec<_>>()
879 .join(", ");
880
881 self.add_issue(
882 ValidationIssue::error(
883 "DUPLICATE_PK",
884 format!(
885 "Duplicate primary key in table '{}': ({})",
886 table_name, pk_display
887 ),
888 )
889 .with_location(
890 Location::new()
891 .with_table(table_name)
892 .with_statement(stmt_idx),
893 ),
894 );
895 }
896 }
897 }
898
899 if self.pk_fk_checks_disabled_due_to_memory {
901 return;
902 }
903
904 let new_fk_checks: Vec<PendingFkCheck> = {
907 let schema = match &self.schema {
908 Some(s) => s,
909 None => return,
910 };
911
912 let table_schema = match schema.table(table_id) {
913 Some(t) => t,
914 None => return,
915 };
916
917 fk_values
918 .iter()
919 .filter_map(|(fk_ref, fk_vals)| {
920 if fk_vals.iter().all(|v| v.is_null()) {
922 return None;
923 }
924
925 let fk_def = table_schema.foreign_keys.get(fk_ref.fk_index as usize)?;
926 let parent_table_id = fk_def.referenced_table_id?;
927
928 let fk_hash = hash_pk_values(fk_vals);
930
931 Some(PendingFkCheck {
932 child_table_id: table_id,
933 parent_table_id,
934 fk_hash,
935 stmt_idx,
936 })
937 })
938 .collect()
939 };
940
941 let new_count = new_fk_checks.len();
943 self.pending_fk_checks.extend(new_fk_checks);
944 self.tracked_fk_count += new_count;
945
946 if new_count > 0 {
947 self.enforce_pk_fk_memory_budget();
948 }
949 }
950
951 fn validate_pending_fk_checks(&mut self) {
953 for check in std::mem::take(&mut self.pending_fk_checks) {
954 let parent_has_pk = self
955 .table_states
956 .get(&check.parent_table_id)
957 .and_then(|s| s.pk_values.as_ref())
958 .is_some_and(|set| set.contains(&check.fk_hash));
959
960 if !parent_has_pk {
961 let state = match self.table_states.get_mut(&check.child_table_id) {
962 Some(s) => s,
963 None => continue,
964 };
965 state.fk_missing_parents += 1;
966
967 if state.fk_missing_parents <= 5 {
969 let (child_name, parent_name) = if let Some(schema) = &self.schema {
971 let child = schema
972 .table(check.child_table_id)
973 .map(|t| t.name.clone())
974 .unwrap_or_else(|| "<unknown>".to_string());
975 let parent = schema
976 .table(check.parent_table_id)
977 .map(|t| t.name.clone())
978 .unwrap_or_else(|| "<unknown>".to_string());
979 (child, parent)
980 } else {
981 ("<unknown>".to_string(), "<unknown>".to_string())
982 };
983
984 self.add_issue(
985 ValidationIssue::error(
986 "FK_MISSING_PARENT",
987 format!(
988 "FK violation in '{}': references missing row in '{}'",
989 child_name, parent_name
990 ),
991 )
992 .with_location(
993 Location::new()
994 .with_table(child_name)
995 .with_statement(check.stmt_idx),
996 ),
997 );
998 }
999 }
1000 }
1001 }
1002
1003 fn build_summary(&self) -> ValidationSummary {
1004 let errors = self
1005 .issues
1006 .iter()
1007 .filter(|i| matches!(i.severity, Severity::Error))
1008 .count();
1009 let warnings = self
1010 .issues
1011 .iter()
1012 .filter(|i| matches!(i.severity, Severity::Warning))
1013 .count();
1014 let info = self
1015 .issues
1016 .iter()
1017 .filter(|i| matches!(i.severity, Severity::Info))
1018 .count();
1019
1020 let syntax_status = if self.syntax_errors > 0 {
1021 CheckStatus::Failed(self.syntax_errors)
1022 } else {
1023 CheckStatus::Ok
1024 };
1025
1026 let encoding_status = if self.encoding_warnings > 0 {
1027 CheckStatus::Failed(self.encoding_warnings)
1028 } else {
1029 CheckStatus::Ok
1030 };
1031
1032 let ddl_dml_status = if self.ddl_dml_errors > 0 {
1033 CheckStatus::Failed(self.ddl_dml_errors)
1034 } else {
1035 CheckStatus::Ok
1036 };
1037
1038 let pk_status = if !self.options.fk_checks_enabled {
1039 CheckStatus::Skipped("--no-fk-checks".to_string())
1040 } else if self.pk_fk_checks_disabled_due_to_memory {
1041 CheckStatus::Skipped("memory limit exceeded".to_string())
1042 } else if self.pk_errors > 0 {
1043 CheckStatus::Failed(self.pk_errors)
1044 } else {
1045 CheckStatus::Ok
1046 };
1047
1048 let fk_status = if !self.options.fk_checks_enabled {
1049 CheckStatus::Skipped("--no-fk-checks".to_string())
1050 } else if self.pk_fk_checks_disabled_due_to_memory {
1051 CheckStatus::Skipped("memory limit exceeded".to_string())
1052 } else if self.fk_errors > 0 {
1053 CheckStatus::Failed(self.fk_errors)
1054 } else {
1055 CheckStatus::Ok
1056 };
1057
1058 ValidationSummary {
1059 dialect: self.dialect.to_string(),
1060 issues: self.issues.clone(),
1061 summary: SummaryStats {
1062 errors,
1063 warnings,
1064 info,
1065 tables_scanned: self.tables_from_ddl.len(),
1066 statements_scanned: self.statement_count,
1067 },
1068 checks: CheckResults {
1069 syntax: syntax_status,
1070 encoding: encoding_status,
1071 ddl_dml_consistency: ddl_dml_status,
1072 pk_duplicates: pk_status,
1073 fk_integrity: fk_status,
1074 },
1075 }
1076 }
1077}