sql_splitter/validate/
mod.rs

1//! Validate module for SQL dump integrity checking.
2//!
3//! This module provides:
4//! - SQL syntax validation (via parser error detection)
5//! - DDL/DML consistency checks (INSERTs reference existing tables)
6//! - Duplicate primary key detection (all dialects)
7//! - FK referential integrity checking (all dialects)
8//! - Encoding validation (UTF-8)
9
10use crate::parser::{
11    determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
12};
13use crate::progress::ProgressReader;
14use crate::schema::{Schema, SchemaBuilder, TableId};
15use crate::splitter::Compression;
16use ahash::{AHashMap, AHashSet};
17use serde::Serialize;
18use std::fmt;
19use std::fs::File;
20use std::io::Read;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24/// Maximum number of issues to collect before stopping
25const MAX_ISSUES: usize = 1000;
26
27/// Issue severity level
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "lowercase")]
30pub enum Severity {
31    Error,
32    Warning,
33    Info,
34}
35
36impl fmt::Display for Severity {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        match self {
39            Severity::Error => write!(f, "ERROR"),
40            Severity::Warning => write!(f, "WARNING"),
41            Severity::Info => write!(f, "INFO"),
42        }
43    }
44}
45
46/// Location in the SQL dump where an issue was found
47#[derive(Debug, Clone, Serialize)]
48pub struct Location {
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub table: Option<String>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub statement_index: Option<u64>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub approx_line: Option<u64>,
55}
56
57impl Location {
58    pub fn new() -> Self {
59        Self {
60            table: None,
61            statement_index: None,
62            approx_line: None,
63        }
64    }
65
66    pub fn with_table(mut self, table: impl Into<String>) -> Self {
67        self.table = Some(table.into());
68        self
69    }
70
71    pub fn with_statement(mut self, index: u64) -> Self {
72        self.statement_index = Some(index);
73        self
74    }
75
76    #[allow(dead_code)]
77    pub fn with_line(mut self, line: u64) -> Self {
78        self.approx_line = Some(line);
79        self
80    }
81}
82
83impl Default for Location {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89/// A validation issue found in the SQL dump
90#[derive(Debug, Clone, Serialize)]
91pub struct ValidationIssue {
92    pub code: &'static str,
93    pub severity: Severity,
94    pub message: String,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub location: Option<Location>,
97}
98
99impl ValidationIssue {
100    pub fn error(code: &'static str, message: impl Into<String>) -> Self {
101        Self {
102            code,
103            severity: Severity::Error,
104            message: message.into(),
105            location: None,
106        }
107    }
108
109    pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
110        Self {
111            code,
112            severity: Severity::Warning,
113            message: message.into(),
114            location: None,
115        }
116    }
117
118    pub fn info(code: &'static str, message: impl Into<String>) -> Self {
119        Self {
120            code,
121            severity: Severity::Info,
122            message: message.into(),
123            location: None,
124        }
125    }
126
127    pub fn with_location(mut self, location: Location) -> Self {
128        self.location = Some(location);
129        self
130    }
131}
132
133impl fmt::Display for ValidationIssue {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        write!(f, "{} [{}]", self.severity, self.code)?;
136        if let Some(ref loc) = self.location {
137            if let Some(ref table) = loc.table {
138                write!(f, " table={}", table)?;
139            }
140            if let Some(stmt) = loc.statement_index {
141                write!(f, " stmt={}", stmt)?;
142            }
143            if let Some(line) = loc.approx_line {
144                write!(f, " line~{}", line)?;
145            }
146        }
147        write!(f, ": {}", self.message)
148    }
149}
150
151/// Validation options
152#[derive(Debug, Clone)]
153pub struct ValidateOptions {
154    pub path: PathBuf,
155    pub dialect: Option<SqlDialect>,
156    pub progress: bool,
157    pub strict: bool,
158    pub json: bool,
159    pub max_rows_per_table: usize,
160    pub fk_checks_enabled: bool,
161}
162
163/// Validation summary with collected issues
164#[derive(Debug, Serialize)]
165pub struct ValidationSummary {
166    pub dialect: String,
167    pub issues: Vec<ValidationIssue>,
168    pub summary: SummaryStats,
169    pub checks: CheckResults,
170}
171
172#[derive(Debug, Serialize)]
173pub struct SummaryStats {
174    pub errors: usize,
175    pub warnings: usize,
176    pub info: usize,
177    pub tables_scanned: usize,
178    pub statements_scanned: u64,
179}
180
181#[derive(Debug, Serialize)]
182pub struct CheckResults {
183    pub syntax: CheckStatus,
184    pub encoding: CheckStatus,
185    pub ddl_dml_consistency: CheckStatus,
186    pub pk_duplicates: CheckStatus,
187    pub fk_integrity: CheckStatus,
188}
189
190#[derive(Debug, Serialize)]
191#[serde(rename_all = "lowercase")]
192pub enum CheckStatus {
193    Ok,
194    Failed(usize),
195    Skipped(String),
196}
197
198impl fmt::Display for CheckStatus {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        match self {
201            CheckStatus::Ok => write!(f, "OK"),
202            CheckStatus::Failed(n) => write!(f, "{} issues", n),
203            CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
204        }
205    }
206}
207
208impl ValidationSummary {
209    pub fn has_errors(&self) -> bool {
210        self.summary.errors > 0
211    }
212
213    pub fn has_warnings(&self) -> bool {
214        self.summary.warnings > 0
215    }
216}
217
218/// Primary key tuple for duplicate detection
219type PkTuple = Vec<Vec<u8>>;
220
221/// Pending FK check to be validated after all PKs are loaded
222struct PendingFkCheck {
223    child_table_id: TableId,
224    child_table_name: String,
225    parent_table_id: TableId,
226    parent_table_name: String,
227    fk_tuple: PkTuple,
228    fk_display: String,
229    stmt_idx: u64,
230}
231
232/// Per-table tracking state for data checks
233struct TableState {
234    row_count: u64,
235    pk_values: Option<AHashSet<PkTuple>>,
236    pk_column_indices: Vec<usize>,
237    pk_duplicates: u64,
238    fk_missing_parents: u64,
239}
240
241impl TableState {
242    fn new() -> Self {
243        Self {
244            row_count: 0,
245            pk_values: Some(AHashSet::new()),
246            pk_column_indices: Vec::new(),
247            pk_duplicates: 0,
248            fk_missing_parents: 0,
249        }
250    }
251
252    fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
253        self.pk_column_indices = indices;
254        self
255    }
256}
257
258/// SQL dump validator
259pub struct Validator {
260    options: ValidateOptions,
261    issues: Vec<ValidationIssue>,
262    dialect: SqlDialect,
263
264    // DDL/DML tracking
265    tables_from_ddl: AHashSet<String>,
266    tables_from_dml: Vec<(String, u64)>, // (table_name, statement_index)
267
268    // Schema for MySQL PK/FK checks
269    schema_builder: SchemaBuilder,
270    schema: Option<Schema>,
271
272    // Per-table state for data checks
273    table_states: AHashMap<TableId, TableState>,
274
275    // Pending FK checks (deferred until all PKs are loaded)
276    pending_fk_checks: Vec<PendingFkCheck>,
277
278    // Progress callback for byte-based progress tracking (Arc for reuse across passes)
279    progress_fn: Option<Arc<dyn Fn(u64) + Send + Sync>>,
280
281    // Counters
282    statement_count: u64,
283    syntax_errors: usize,
284    encoding_warnings: usize,
285    ddl_dml_errors: usize,
286    pk_errors: usize,
287    fk_errors: usize,
288}
289
290impl Validator {
291    pub fn new(options: ValidateOptions) -> Self {
292        Self {
293            dialect: options.dialect.unwrap_or(SqlDialect::MySql),
294            options,
295            issues: Vec::new(),
296            tables_from_ddl: AHashSet::new(),
297            tables_from_dml: Vec::new(),
298            schema_builder: SchemaBuilder::new(),
299            schema: None,
300            table_states: AHashMap::new(),
301            pending_fk_checks: Vec::new(),
302            progress_fn: None,
303            statement_count: 0,
304            syntax_errors: 0,
305            encoding_warnings: 0,
306            ddl_dml_errors: 0,
307            pk_errors: 0,
308            fk_errors: 0,
309        }
310    }
311
312    /// Set a progress callback for byte-based progress tracking.
313    /// The callback receives cumulative bytes read across both validation passes.
314    pub fn with_progress<F>(mut self, f: F) -> Self
315    where
316        F: Fn(u64) + Send + Sync + 'static,
317    {
318        self.progress_fn = Some(Arc::new(f));
319        self
320    }
321
322    fn add_issue(&mut self, issue: ValidationIssue) {
323        if self.issues.len() >= MAX_ISSUES {
324            return;
325        }
326
327        match issue.severity {
328            Severity::Error => match issue.code {
329                "SYNTAX" => self.syntax_errors += 1,
330                "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
331                "DUPLICATE_PK" => self.pk_errors += 1,
332                "FK_MISSING_PARENT" => self.fk_errors += 1,
333                _ => {}
334            },
335            Severity::Warning => {
336                if issue.code == "ENCODING" {
337                    self.encoding_warnings += 1;
338                }
339            }
340            Severity::Info => {}
341        }
342
343        self.issues.push(issue);
344    }
345
346    pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
347        let file = File::open(&self.options.path)?;
348        let file_size = file.metadata()?.len();
349        let buffer_size = determine_buffer_size(file_size);
350
351        // Pass 1 reports bytes as 0 to file_size/2 (first half of progress bar)
352        let compression = Compression::from_path(&self.options.path);
353        let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
354            let cb = Arc::clone(cb);
355            let progress_reader = ProgressReader::new(file, move |bytes| {
356                // Scale to first half: 0% to 50%
357                cb(bytes / 2)
358            });
359            compression.wrap_reader(Box::new(progress_reader))
360        } else {
361            compression.wrap_reader(Box::new(file))
362        };
363
364        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
365
366        // Pass 1: Build schema and check DDL/DML consistency
367        loop {
368            match parser.read_statement() {
369                Ok(Some(stmt)) => {
370                    self.statement_count += 1;
371                    self.process_statement(&stmt);
372                }
373                Ok(None) => break,
374                Err(e) => {
375                    self.add_issue(
376                        ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
377                            .with_location(
378                                Location::new().with_statement(self.statement_count + 1),
379                            ),
380                    );
381                    break;
382                }
383            }
384        }
385
386        // Check for DML referencing missing tables - collect issues first, then add them
387        let missing_table_issues: Vec<_> = self
388            .tables_from_dml
389            .iter()
390            .filter(|(table, _)| {
391                let table_lower = table.to_lowercase();
392                !self
393                    .tables_from_ddl
394                    .iter()
395                    .any(|t| t.to_lowercase() == table_lower)
396            })
397            .map(|(table, stmt_idx)| {
398                ValidationIssue::error(
399                    "DDL_MISSING_TABLE",
400                    format!(
401                        "INSERT/COPY references table '{}' with no CREATE TABLE",
402                        table
403                    ),
404                )
405                .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
406            })
407            .collect();
408
409        for issue in missing_table_issues {
410            self.add_issue(issue);
411        }
412
413        // Finalize schema and resolve FK references for data checks (all dialects)
414        if self.options.fk_checks_enabled {
415            self.schema = Some(self.schema_builder.build());
416            self.schema_builder = SchemaBuilder::new(); // Reset to avoid double use
417            self.initialize_table_states();
418        }
419
420        // Pass 2: Data checks (PK + collect FK refs) - requires re-reading the file
421        let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
422        if self.options.fk_checks_enabled && schema_not_empty {
423            self.run_data_checks()?;
424            // Now that all PKs are loaded, validate the collected FK references
425            self.validate_pending_fk_checks();
426        }
427
428        Ok(self.build_summary())
429    }
430
431    fn process_statement(&mut self, stmt: &[u8]) {
432        // Check encoding
433        if std::str::from_utf8(stmt).is_err() {
434            self.add_issue(
435                ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
436                    .with_location(Location::new().with_statement(self.statement_count)),
437            );
438        }
439
440        let (stmt_type, table_name) =
441            Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
442
443        match stmt_type {
444            StatementType::CreateTable => {
445                if !table_name.is_empty() {
446                    self.tables_from_ddl.insert(table_name.clone());
447
448                    // Parse CREATE TABLE for schema info (all dialects supported)
449                    if let Ok(stmt_str) = std::str::from_utf8(stmt) {
450                        self.schema_builder.parse_create_table(stmt_str);
451                    }
452                }
453            }
454            StatementType::AlterTable => {
455                // Parse ALTER TABLE for FK constraints (all dialects supported)
456                if let Ok(stmt_str) = std::str::from_utf8(stmt) {
457                    self.schema_builder.parse_alter_table(stmt_str);
458                }
459            }
460            StatementType::Insert | StatementType::Copy => {
461                if !table_name.is_empty() {
462                    self.tables_from_dml
463                        .push((table_name, self.statement_count));
464                }
465            }
466            StatementType::Unknown => {
467                // Could be a session command or comment - not an error
468            }
469            _ => {}
470        }
471    }
472
473    fn initialize_table_states(&mut self) {
474        let schema = match &self.schema {
475            Some(s) => s,
476            None => return,
477        };
478
479        for table_schema in schema.iter() {
480            let pk_indices: Vec<usize> = table_schema
481                .primary_key
482                .iter()
483                .map(|col_id| col_id.0 as usize)
484                .collect();
485
486            let state = TableState::new().with_pk_columns(pk_indices);
487            self.table_states.insert(table_schema.id, state);
488        }
489    }
490
491    fn run_data_checks(&mut self) -> anyhow::Result<()> {
492        let file = File::open(&self.options.path)?;
493        let file_size = file.metadata()?.len();
494        let buffer_size = determine_buffer_size(file_size);
495
496        // Pass 2 reports bytes as file_size/2 to file_size (second half of progress bar)
497        let compression = Compression::from_path(&self.options.path);
498        let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
499            let cb = Arc::clone(cb);
500            let progress_reader = ProgressReader::new(file, move |bytes| {
501                // Scale to second half: 50% to 100%
502                cb(file_size / 2 + bytes / 2)
503            });
504            compression.wrap_reader(Box::new(progress_reader))
505        } else {
506            compression.wrap_reader(Box::new(file))
507        };
508
509        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
510        let mut stmt_count: u64 = 0;
511
512        while let Some(stmt) = parser.read_statement()? {
513            stmt_count += 1;
514
515            let (stmt_type, table_name) =
516                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
517
518            // Get table_id without holding a borrow on self.schema
519            let table_id = match &self.schema {
520                Some(s) => match s.get_table_id(&table_name) {
521                    Some(id) => id,
522                    None => continue,
523                },
524                None => continue,
525            };
526
527            match stmt_type {
528                StatementType::Insert => {
529                    // MySQL and SQLite use INSERT VALUES syntax
530                    self.check_insert_statement(&stmt, table_id, &table_name, stmt_count);
531                }
532                StatementType::Copy => {
533                    // PostgreSQL uses COPY ... FROM stdin format
534                    self.check_copy_statement(&stmt, table_id, &table_name, stmt_count);
535                }
536                _ => continue,
537            }
538        }
539
540        Ok(())
541    }
542
543    /// Check rows from a MySQL/SQLite INSERT statement
544    fn check_insert_statement(
545        &mut self,
546        stmt: &[u8],
547        table_id: TableId,
548        table_name: &str,
549        stmt_count: u64,
550    ) {
551        let table_schema = match &self.schema {
552            Some(s) => match s.table(table_id) {
553                Some(ts) => ts,
554                None => return,
555            },
556            None => return,
557        };
558
559        // Parse rows from INSERT using the schema (works for MySQL and SQLite)
560        let rows = match mysql_insert::parse_mysql_insert_rows(stmt, table_schema) {
561            Ok(r) => r,
562            Err(_) => return,
563        };
564
565        for row in rows {
566            self.check_mysql_row(table_id, table_name, &row, stmt_count);
567        }
568    }
569
570    /// Check rows from a PostgreSQL COPY statement
571    fn check_copy_statement(
572        &mut self,
573        stmt: &[u8],
574        table_id: TableId,
575        table_name: &str,
576        stmt_count: u64,
577    ) {
578        // Find the COPY header line and the data section
579        let stmt_str = match std::str::from_utf8(stmt) {
580            Ok(s) => s,
581            Err(_) => return,
582        };
583
584        // Find the data section (after the header line ending with "FROM stdin;")
585        let data_start = if let Some(pos) = stmt_str.find("FROM stdin;") {
586            pos + "FROM stdin;".len()
587        } else if let Some(pos) = stmt_str.find("from stdin;") {
588            pos + "from stdin;".len()
589        } else {
590            return;
591        };
592
593        // Skip any whitespace/newlines after the header
594        let data_section = stmt_str[data_start..].trim_start();
595        if data_section.is_empty() {
596            return;
597        }
598
599        // Parse column list from the header
600        let header = &stmt_str[..data_start];
601        let column_order = postgres_copy::parse_copy_columns(header);
602
603        // Get table schema
604        let table_schema = match &self.schema {
605            Some(s) => match s.table(table_id) {
606                Some(ts) => ts,
607                None => return,
608            },
609            None => return,
610        };
611
612        // Parse the COPY data rows
613        let rows = match postgres_copy::parse_postgres_copy_rows(
614            data_section.as_bytes(),
615            table_schema,
616            column_order,
617        ) {
618            Ok(r) => r,
619            Err(_) => return,
620        };
621
622        for row in rows {
623            self.check_copy_row(table_id, table_name, &row, stmt_count);
624        }
625    }
626
627    /// Check a row from MySQL INSERT or SQLite INSERT
628    fn check_mysql_row(
629        &mut self,
630        table_id: TableId,
631        table_name: &str,
632        row: &mysql_insert::ParsedRow,
633        stmt_idx: u64,
634    ) {
635        self.check_row_common(
636            table_id,
637            table_name,
638            row.pk.as_ref(),
639            &row.fk_values,
640            stmt_idx,
641        );
642    }
643
644    /// Check a row from PostgreSQL COPY
645    fn check_copy_row(
646        &mut self,
647        table_id: TableId,
648        table_name: &str,
649        row: &postgres_copy::ParsedCopyRow,
650        stmt_idx: u64,
651    ) {
652        self.check_row_common(
653            table_id,
654            table_name,
655            row.pk.as_ref(),
656            &row.fk_values,
657            stmt_idx,
658        );
659    }
660
661    /// Common row checking logic for all dialects
662    fn check_row_common(
663        &mut self,
664        table_id: TableId,
665        table_name: &str,
666        pk: Option<&smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
667        fk_values: &[(mysql_insert::FkRef, smallvec::SmallVec<[mysql_insert::PkValue; 2]>)],
668        stmt_idx: u64,
669    ) {
670        let max_rows = self.options.max_rows_per_table as u64;
671
672        let state = match self.table_states.get_mut(&table_id) {
673            Some(s) => s,
674            None => return,
675        };
676
677        state.row_count += 1;
678
679        // Check if we've exceeded max rows for this table
680        if state.row_count > max_rows {
681            if state.pk_values.is_some() {
682                state.pk_values = None;
683                self.add_issue(
684                    ValidationIssue::warning(
685                        "PK_CHECK_SKIPPED",
686                        format!(
687                            "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
688                            table_name, max_rows
689                        ),
690                    )
691                    .with_location(Location::new().with_table(table_name)),
692                );
693            }
694            return;
695        }
696
697        // PK duplicate check using the parsed PK from the row
698        if let Some(pk_values) = pk {
699            if let Some(ref mut pk_set) = state.pk_values {
700                // Convert SmallVec<[PkValue; 2]> to Vec<Vec<u8>> for our set
701                let pk_tuple: PkTuple = pk_values
702                    .iter()
703                    .map(|v| match v {
704                        mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
705                        mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
706                        mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
707                        mysql_insert::PkValue::Null => Vec::new(),
708                    })
709                    .collect();
710
711                if !pk_set.insert(pk_tuple.clone()) {
712                    state.pk_duplicates += 1;
713                    let pk_display: String = pk_values
714                        .iter()
715                        .map(|v| match v {
716                            mysql_insert::PkValue::Int(i) => i.to_string(),
717                            mysql_insert::PkValue::BigInt(i) => i.to_string(),
718                            mysql_insert::PkValue::Text(s) => s.to_string(),
719                            mysql_insert::PkValue::Null => "NULL".to_string(),
720                        })
721                        .collect::<Vec<_>>()
722                        .join(", ");
723
724                    self.add_issue(
725                        ValidationIssue::error(
726                            "DUPLICATE_PK",
727                            format!(
728                                "Duplicate primary key in table '{}': ({})",
729                                table_name, pk_display
730                            ),
731                        )
732                        .with_location(
733                            Location::new()
734                                .with_table(table_name)
735                                .with_statement(stmt_idx),
736                        ),
737                    );
738                }
739            }
740        }
741
742        // Collect FK references for deferred validation (after all PKs are loaded)
743        let schema = match &self.schema {
744            Some(s) => s,
745            None => return,
746        };
747
748        let table_schema = match schema.table(table_id) {
749            Some(t) => t,
750            None => return,
751        };
752
753        for (fk_ref, fk_vals) in fk_values.iter() {
754            // Skip if all FK values are NULL (nullable FK)
755            if fk_vals.iter().all(|v| v.is_null()) {
756                continue;
757            }
758
759            let fk_def = match table_schema.foreign_keys.get(fk_ref.fk_index as usize) {
760                Some(fk) => fk,
761                None => continue,
762            };
763
764            let parent_table_id = match fk_def.referenced_table_id {
765                Some(id) => id,
766                None => continue,
767            };
768
769            let fk_tuple: PkTuple = fk_vals
770                .iter()
771                .map(|v| match v {
772                    mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
773                    mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
774                    mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
775                    mysql_insert::PkValue::Null => Vec::new(),
776                })
777                .collect();
778
779            let fk_display: String = fk_vals
780                .iter()
781                .map(|v| match v {
782                    mysql_insert::PkValue::Int(i) => i.to_string(),
783                    mysql_insert::PkValue::BigInt(i) => i.to_string(),
784                    mysql_insert::PkValue::Text(s) => s.to_string(),
785                    mysql_insert::PkValue::Null => "NULL".to_string(),
786                })
787                .collect::<Vec<_>>()
788                .join(", ");
789
790            self.pending_fk_checks.push(PendingFkCheck {
791                child_table_id: table_id,
792                child_table_name: table_name.to_string(),
793                parent_table_id,
794                parent_table_name: fk_def.referenced_table.clone(),
795                fk_tuple,
796                fk_display,
797                stmt_idx,
798            });
799        }
800    }
801
802    /// Validate all collected FK references after all PKs are loaded
803    fn validate_pending_fk_checks(&mut self) {
804        for check in std::mem::take(&mut self.pending_fk_checks) {
805            let parent_has_pk = self
806                .table_states
807                .get(&check.parent_table_id)
808                .and_then(|s| s.pk_values.as_ref())
809                .is_some_and(|set| set.contains(&check.fk_tuple));
810
811            if !parent_has_pk {
812                let state = match self.table_states.get_mut(&check.child_table_id) {
813                    Some(s) => s,
814                    None => continue,
815                };
816                state.fk_missing_parents += 1;
817
818                // Only add issue for first few violations per table
819                if state.fk_missing_parents <= 5 {
820                    self.add_issue(
821                        ValidationIssue::error(
822                            "FK_MISSING_PARENT",
823                            format!(
824                                "FK violation in '{}': ({}) references missing row in '{}'",
825                                check.child_table_name, check.fk_display, check.parent_table_name
826                            ),
827                        )
828                        .with_location(
829                            Location::new()
830                                .with_table(&check.child_table_name)
831                                .with_statement(check.stmt_idx),
832                        ),
833                    );
834                }
835            }
836        }
837    }
838
839    fn build_summary(&self) -> ValidationSummary {
840        let errors = self
841            .issues
842            .iter()
843            .filter(|i| matches!(i.severity, Severity::Error))
844            .count();
845        let warnings = self
846            .issues
847            .iter()
848            .filter(|i| matches!(i.severity, Severity::Warning))
849            .count();
850        let info = self
851            .issues
852            .iter()
853            .filter(|i| matches!(i.severity, Severity::Info))
854            .count();
855
856        let syntax_status = if self.syntax_errors > 0 {
857            CheckStatus::Failed(self.syntax_errors)
858        } else {
859            CheckStatus::Ok
860        };
861
862        let encoding_status = if self.encoding_warnings > 0 {
863            CheckStatus::Failed(self.encoding_warnings)
864        } else {
865            CheckStatus::Ok
866        };
867
868        let ddl_dml_status = if self.ddl_dml_errors > 0 {
869            CheckStatus::Failed(self.ddl_dml_errors)
870        } else {
871            CheckStatus::Ok
872        };
873
874        let pk_status = if !self.options.fk_checks_enabled {
875            CheckStatus::Skipped("--no-fk-checks".to_string())
876        } else if self.pk_errors > 0 {
877            CheckStatus::Failed(self.pk_errors)
878        } else {
879            CheckStatus::Ok
880        };
881
882        let fk_status = if !self.options.fk_checks_enabled {
883            CheckStatus::Skipped("--no-fk-checks".to_string())
884        } else if self.fk_errors > 0 {
885            CheckStatus::Failed(self.fk_errors)
886        } else {
887            CheckStatus::Ok
888        };
889
890        ValidationSummary {
891            dialect: self.dialect.to_string(),
892            issues: self.issues.clone(),
893            summary: SummaryStats {
894                errors,
895                warnings,
896                info,
897                tables_scanned: self.tables_from_ddl.len(),
898                statements_scanned: self.statement_count,
899            },
900            checks: CheckResults {
901                syntax: syntax_status,
902                encoding: encoding_status,
903                ddl_dml_consistency: ddl_dml_status,
904                pk_duplicates: pk_status,
905                fk_integrity: fk_status,
906            },
907        }
908    }
909}