sql_splitter/validate/
mod.rs

1//! Validate module for SQL dump integrity checking.
2//!
3//! This module provides:
4//! - SQL syntax validation (via parser error detection)
5//! - DDL/DML consistency checks (INSERTs reference existing tables)
6//! - Duplicate primary key detection (all dialects)
7//! - FK referential integrity checking (all dialects)
8//! - Encoding validation (UTF-8)
9
10use crate::parser::{
11    determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
12};
13use crate::progress::ProgressReader;
14use crate::schema::{Schema, SchemaBuilder, TableId};
15use crate::splitter::Compression;
16use ahash::{AHashMap, AHashSet};
17use serde::Serialize;
18use std::fmt;
19use std::fs::File;
20use std::hash::{Hash, Hasher};
21use std::io::Read;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25/// Maximum number of issues to collect before stopping
26const MAX_ISSUES: usize = 1000;
27
28/// Issue severity level
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
30#[serde(rename_all = "lowercase")]
31pub enum Severity {
32    Error,
33    Warning,
34    Info,
35}
36
37impl fmt::Display for Severity {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            Severity::Error => write!(f, "ERROR"),
41            Severity::Warning => write!(f, "WARNING"),
42            Severity::Info => write!(f, "INFO"),
43        }
44    }
45}
46
47/// Location in the SQL dump where an issue was found
48#[derive(Debug, Clone, Serialize)]
49pub struct Location {
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub table: Option<String>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub statement_index: Option<u64>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub approx_line: Option<u64>,
56}
57
58impl Location {
59    pub fn new() -> Self {
60        Self {
61            table: None,
62            statement_index: None,
63            approx_line: None,
64        }
65    }
66
67    pub fn with_table(mut self, table: impl Into<String>) -> Self {
68        self.table = Some(table.into());
69        self
70    }
71
72    pub fn with_statement(mut self, index: u64) -> Self {
73        self.statement_index = Some(index);
74        self
75    }
76
77    #[allow(dead_code)]
78    pub fn with_line(mut self, line: u64) -> Self {
79        self.approx_line = Some(line);
80        self
81    }
82}
83
84impl Default for Location {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90/// A validation issue found in the SQL dump
91#[derive(Debug, Clone, Serialize)]
92pub struct ValidationIssue {
93    pub code: &'static str,
94    pub severity: Severity,
95    pub message: String,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub location: Option<Location>,
98}
99
100impl ValidationIssue {
101    pub fn error(code: &'static str, message: impl Into<String>) -> Self {
102        Self {
103            code,
104            severity: Severity::Error,
105            message: message.into(),
106            location: None,
107        }
108    }
109
110    pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
111        Self {
112            code,
113            severity: Severity::Warning,
114            message: message.into(),
115            location: None,
116        }
117    }
118
119    pub fn info(code: &'static str, message: impl Into<String>) -> Self {
120        Self {
121            code,
122            severity: Severity::Info,
123            message: message.into(),
124            location: None,
125        }
126    }
127
128    pub fn with_location(mut self, location: Location) -> Self {
129        self.location = Some(location);
130        self
131    }
132}
133
134impl fmt::Display for ValidationIssue {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        write!(f, "{} [{}]", self.severity, self.code)?;
137        if let Some(ref loc) = self.location {
138            if let Some(ref table) = loc.table {
139                write!(f, " table={}", table)?;
140            }
141            if let Some(stmt) = loc.statement_index {
142                write!(f, " stmt={}", stmt)?;
143            }
144            if let Some(line) = loc.approx_line {
145                write!(f, " line~{}", line)?;
146            }
147        }
148        write!(f, ": {}", self.message)
149    }
150}
151
152/// Validation options
153#[derive(Debug, Clone)]
154pub struct ValidateOptions {
155    pub path: PathBuf,
156    pub dialect: Option<SqlDialect>,
157    pub progress: bool,
158    pub strict: bool,
159    pub json: bool,
160    pub max_rows_per_table: usize,
161    pub fk_checks_enabled: bool,
162    /// Optional global cap on tracked PK/FK keys for memory safety.
163    /// When exceeded, PK/FK checks are skipped for the remainder of the run.
164    /// If None, no limit is enforced (default).
165    pub max_pk_fk_keys: Option<usize>,
166}
167
168/// Validation summary with collected issues
169#[derive(Debug, Serialize)]
170pub struct ValidationSummary {
171    pub dialect: String,
172    pub issues: Vec<ValidationIssue>,
173    pub summary: SummaryStats,
174    pub checks: CheckResults,
175}
176
177#[derive(Debug, Serialize)]
178pub struct SummaryStats {
179    pub errors: usize,
180    pub warnings: usize,
181    pub info: usize,
182    pub tables_scanned: usize,
183    pub statements_scanned: u64,
184}
185
186#[derive(Debug, Serialize)]
187pub struct CheckResults {
188    pub syntax: CheckStatus,
189    pub encoding: CheckStatus,
190    pub ddl_dml_consistency: CheckStatus,
191    pub pk_duplicates: CheckStatus,
192    pub fk_integrity: CheckStatus,
193}
194
195#[derive(Debug, Serialize)]
196#[serde(rename_all = "lowercase")]
197pub enum CheckStatus {
198    Ok,
199    Failed(usize),
200    Skipped(String),
201}
202
203impl fmt::Display for CheckStatus {
204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205        match self {
206            CheckStatus::Ok => write!(f, "OK"),
207            CheckStatus::Failed(n) => write!(f, "{} issues", n),
208            CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
209        }
210    }
211}
212
213impl ValidationSummary {
214    pub fn has_errors(&self) -> bool {
215        self.summary.errors > 0
216    }
217
218    pub fn has_warnings(&self) -> bool {
219        self.summary.warnings > 0
220    }
221}
222
223/// Compact primary/foreign key representation for duplicate and FK checks.
224/// We use a 64-bit hash; collision risk is negligible for realistic dumps.
225type PkHash = u64;
226
227/// Hash a list of PK/FK values into a compact 64-bit hash.
228/// Uses AHash for fast, high-quality hashing.
229fn hash_pk_values(values: &smallvec::SmallVec<[mysql_insert::PkValue; 2]>) -> PkHash {
230    let mut hasher = ahash::AHasher::default();
231
232    // Include arity (number of columns) in the hash to distinguish (1) from (1, NULL)
233    (values.len() as u8).hash(&mut hasher);
234
235    for v in values {
236        match v {
237            mysql_insert::PkValue::Int(i) => {
238                0u8.hash(&mut hasher);
239                i.hash(&mut hasher);
240            }
241            mysql_insert::PkValue::BigInt(i) => {
242                1u8.hash(&mut hasher);
243                i.hash(&mut hasher);
244            }
245            mysql_insert::PkValue::Text(s) => {
246                2u8.hash(&mut hasher);
247                s.hash(&mut hasher);
248            }
249            mysql_insert::PkValue::Null => {
250                3u8.hash(&mut hasher);
251            }
252        }
253    }
254
255    hasher.finish()
256}
257
258/// Pending FK check to be validated after all PKs are loaded.
259/// Uses compact hash representation to minimize memory usage.
260struct PendingFkCheck {
261    child_table_id: TableId,
262    parent_table_id: TableId,
263    fk_hash: PkHash,
264    stmt_idx: u64,
265}
266
267/// Per-table tracking state for data checks.
268/// Uses hashed PK values to minimize memory usage.
269struct TableState {
270    row_count: u64,
271    /// Set of hashed PKs for duplicate and FK parent existence checks.
272    /// When None, PK/FK checks for this table are skipped (due to row or memory limits).
273    pk_values: Option<AHashSet<PkHash>>,
274    pk_column_indices: Vec<usize>,
275    pk_duplicates: u64,
276    fk_missing_parents: u64,
277}
278
279impl TableState {
280    fn new() -> Self {
281        Self {
282            row_count: 0,
283            pk_values: Some(AHashSet::new()),
284            pk_column_indices: Vec::new(),
285            pk_duplicates: 0,
286            fk_missing_parents: 0,
287        }
288    }
289
290    fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
291        self.pk_column_indices = indices;
292        self
293    }
294}
295
296/// SQL dump validator
297pub struct Validator {
298    options: ValidateOptions,
299    issues: Vec<ValidationIssue>,
300    dialect: SqlDialect,
301
302    // DDL/DML tracking
303    tables_from_ddl: AHashSet<String>,
304    tables_from_dml: Vec<(String, u64)>, // (table_name, statement_index)
305
306    // Schema for MySQL PK/FK checks
307    schema_builder: SchemaBuilder,
308    schema: Option<Schema>,
309
310    // Per-table state for data checks
311    table_states: AHashMap<TableId, TableState>,
312
313    // Pending FK checks (deferred until all PKs are loaded)
314    pending_fk_checks: Vec<PendingFkCheck>,
315
316    // Progress callback for byte-based progress tracking (Arc for reuse across passes)
317    progress_fn: Option<Arc<dyn Fn(u64) + Send + Sync>>,
318
319    // Counters
320    statement_count: u64,
321    syntax_errors: usize,
322    encoding_warnings: usize,
323    ddl_dml_errors: usize,
324    pk_errors: usize,
325    fk_errors: usize,
326
327    // Memory tracking for PK/FK checks
328    tracked_pk_count: usize,
329    tracked_fk_count: usize,
330    pk_fk_checks_disabled_due_to_memory: bool,
331
332    // PostgreSQL COPY context: (table_name, column_order, table_id)
333    current_copy_context: Option<(String, Vec<String>, TableId)>,
334}
335
336impl Validator {
337    pub fn new(options: ValidateOptions) -> Self {
338        Self {
339            dialect: options.dialect.unwrap_or(SqlDialect::MySql),
340            options,
341            issues: Vec::new(),
342            tables_from_ddl: AHashSet::new(),
343            tables_from_dml: Vec::new(),
344            schema_builder: SchemaBuilder::new(),
345            schema: None,
346            table_states: AHashMap::new(),
347            pending_fk_checks: Vec::new(),
348            progress_fn: None,
349            statement_count: 0,
350            syntax_errors: 0,
351            encoding_warnings: 0,
352            ddl_dml_errors: 0,
353            pk_errors: 0,
354            fk_errors: 0,
355            tracked_pk_count: 0,
356            tracked_fk_count: 0,
357            pk_fk_checks_disabled_due_to_memory: false,
358            current_copy_context: None,
359        }
360    }
361
362    /// Set a progress callback for byte-based progress tracking.
363    /// The callback receives cumulative bytes read across both validation passes.
364    pub fn with_progress<F>(mut self, f: F) -> Self
365    where
366        F: Fn(u64) + Send + Sync + 'static,
367    {
368        self.progress_fn = Some(Arc::new(f));
369        self
370    }
371
372    fn add_issue(&mut self, issue: ValidationIssue) {
373        if self.issues.len() >= MAX_ISSUES {
374            return;
375        }
376
377        match issue.severity {
378            Severity::Error => match issue.code {
379                "SYNTAX" => self.syntax_errors += 1,
380                "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
381                "DUPLICATE_PK" => self.pk_errors += 1,
382                "FK_MISSING_PARENT" => self.fk_errors += 1,
383                _ => {}
384            },
385            Severity::Warning => {
386                if issue.code == "ENCODING" {
387                    self.encoding_warnings += 1;
388                }
389            }
390            Severity::Info => {}
391        }
392
393        self.issues.push(issue);
394    }
395
396    /// Check if we've exceeded the memory budget for PK/FK tracking.
397    /// If so, disable further checks and free existing state.
398    fn enforce_pk_fk_memory_budget(&mut self) {
399        if self.pk_fk_checks_disabled_due_to_memory {
400            return;
401        }
402
403        let Some(limit) = self.options.max_pk_fk_keys else {
404            return;
405        };
406
407        let total_tracked = self.tracked_pk_count + self.tracked_fk_count;
408        if total_tracked > limit {
409            self.pk_fk_checks_disabled_due_to_memory = true;
410
411            // Drop existing state to free memory
412            for state in self.table_states.values_mut() {
413                state.pk_values = None;
414            }
415            self.pending_fk_checks.clear();
416            self.pending_fk_checks.shrink_to_fit();
417
418            self.add_issue(ValidationIssue::warning(
419                "PK_FK_CHECKS_SKIPPED_MEMORY",
420                format!(
421                    "Skipping PK/FK checks after tracking {} keys (memory limit of {} exceeded)",
422                    total_tracked, limit
423                ),
424            ));
425        }
426    }
427
428    pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
429        let file = File::open(&self.options.path)?;
430        let file_size = file.metadata()?.len();
431        let buffer_size = determine_buffer_size(file_size);
432
433        // Pass 1 reports bytes as 0 to file_size/2 (first half of progress bar)
434        let compression = Compression::from_path(&self.options.path);
435        let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
436            let cb = Arc::clone(cb);
437            let progress_reader = ProgressReader::new(file, move |bytes| {
438                // Scale to first half: 0% to 50%
439                cb(bytes / 2)
440            });
441            compression.wrap_reader(Box::new(progress_reader))
442        } else {
443            compression.wrap_reader(Box::new(file))
444        };
445
446        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
447
448        // Pass 1: Build schema and check DDL/DML consistency
449        loop {
450            match parser.read_statement() {
451                Ok(Some(stmt)) => {
452                    self.statement_count += 1;
453                    self.process_statement(&stmt);
454                }
455                Ok(None) => break,
456                Err(e) => {
457                    self.add_issue(
458                        ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
459                            .with_location(
460                                Location::new().with_statement(self.statement_count + 1),
461                            ),
462                    );
463                    break;
464                }
465            }
466        }
467
468        // Check for DML referencing missing tables - collect issues first, then add them
469        let missing_table_issues: Vec<_> = self
470            .tables_from_dml
471            .iter()
472            .filter(|(table, _)| {
473                let table_lower = table.to_lowercase();
474                !self
475                    .tables_from_ddl
476                    .iter()
477                    .any(|t| t.to_lowercase() == table_lower)
478            })
479            .map(|(table, stmt_idx)| {
480                ValidationIssue::error(
481                    "DDL_MISSING_TABLE",
482                    format!(
483                        "INSERT/COPY references table '{}' with no CREATE TABLE",
484                        table
485                    ),
486                )
487                .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
488            })
489            .collect();
490
491        for issue in missing_table_issues {
492            self.add_issue(issue);
493        }
494
495        // Finalize schema and resolve FK references for data checks (all dialects)
496        if self.options.fk_checks_enabled {
497            self.schema = Some(self.schema_builder.build());
498            self.schema_builder = SchemaBuilder::new(); // Reset to avoid double use
499            self.initialize_table_states();
500        }
501
502        // Pass 2: Data checks (PK + collect FK refs) - requires re-reading the file
503        let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
504        if self.options.fk_checks_enabled && schema_not_empty {
505            self.run_data_checks()?;
506            // Now that all PKs are loaded, validate the collected FK references
507            self.validate_pending_fk_checks();
508        }
509
510        Ok(self.build_summary())
511    }
512
513    fn process_statement(&mut self, stmt: &[u8]) {
514        // Check encoding
515        if std::str::from_utf8(stmt).is_err() {
516            self.add_issue(
517                ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
518                    .with_location(Location::new().with_statement(self.statement_count)),
519            );
520        }
521
522        let (stmt_type, table_name) =
523            Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
524
525        match stmt_type {
526            StatementType::CreateTable => {
527                if !table_name.is_empty() {
528                    self.tables_from_ddl.insert(table_name.clone());
529
530                    // Parse CREATE TABLE for schema info (all dialects supported)
531                    if let Ok(stmt_str) = std::str::from_utf8(stmt) {
532                        self.schema_builder.parse_create_table(stmt_str);
533                    }
534                }
535            }
536            StatementType::AlterTable => {
537                // Parse ALTER TABLE for FK constraints (all dialects supported)
538                if let Ok(stmt_str) = std::str::from_utf8(stmt) {
539                    self.schema_builder.parse_alter_table(stmt_str);
540                }
541            }
542            StatementType::Insert | StatementType::Copy => {
543                if !table_name.is_empty() {
544                    self.tables_from_dml
545                        .push((table_name, self.statement_count));
546                }
547            }
548            StatementType::Unknown => {
549                // Could be a session command or comment - not an error
550            }
551            _ => {}
552        }
553    }
554
555    fn initialize_table_states(&mut self) {
556        let schema = match &self.schema {
557            Some(s) => s,
558            None => return,
559        };
560
561        for table_schema in schema.iter() {
562            let pk_indices: Vec<usize> = table_schema
563                .primary_key
564                .iter()
565                .map(|col_id| col_id.0 as usize)
566                .collect();
567
568            let state = TableState::new().with_pk_columns(pk_indices);
569            self.table_states.insert(table_schema.id, state);
570        }
571    }
572
573    fn run_data_checks(&mut self) -> anyhow::Result<()> {
574        let file = File::open(&self.options.path)?;
575        let file_size = file.metadata()?.len();
576        let buffer_size = determine_buffer_size(file_size);
577
578        // Pass 2 reports bytes as file_size/2 to file_size (second half of progress bar)
579        let compression = Compression::from_path(&self.options.path);
580        let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
581            let cb = Arc::clone(cb);
582            let progress_reader = ProgressReader::new(file, move |bytes| {
583                // Scale to second half: 50% to 100%
584                cb(file_size / 2 + bytes / 2)
585            });
586            compression.wrap_reader(Box::new(progress_reader))
587        } else {
588            compression.wrap_reader(Box::new(file))
589        };
590
591        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
592        let mut stmt_count: u64 = 0;
593
594        // Reset COPY context for this pass
595        self.current_copy_context = None;
596
597        while let Some(stmt) = parser.read_statement()? {
598            stmt_count += 1;
599
600            let (stmt_type, table_name) =
601                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
602
603            // Handle PostgreSQL COPY data (separate statement from header)
604            if self.dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
605                // Check if this looks like COPY data (ends with \.)
606                if stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n") {
607                    if let Some((ref copy_table, ref column_order, copy_table_id)) =
608                        self.current_copy_context.clone()
609                    {
610                        self.check_copy_data(
611                            &stmt,
612                            copy_table_id,
613                            copy_table,
614                            column_order.clone(),
615                            stmt_count,
616                        );
617                    }
618                }
619                self.current_copy_context = None;
620                continue;
621            }
622
623            // Get table_id without holding a borrow on self.schema
624            let table_id = match &self.schema {
625                Some(s) => match s.get_table_id(&table_name) {
626                    Some(id) => id,
627                    None => continue,
628                },
629                None => continue,
630            };
631
632            match stmt_type {
633                StatementType::Insert => {
634                    // MySQL and SQLite use INSERT VALUES syntax
635                    self.check_insert_statement(&stmt, table_id, &table_name, stmt_count);
636                }
637                StatementType::Copy => {
638                    // For PostgreSQL COPY, the data comes in the next statement
639                    // Save context for processing the data statement
640                    let header = String::from_utf8_lossy(&stmt);
641                    let column_order = postgres_copy::parse_copy_columns(&header);
642                    self.current_copy_context = Some((table_name.clone(), column_order, table_id));
643                }
644                _ => continue,
645            }
646        }
647
648        Ok(())
649    }
650
651    /// Check rows from a MySQL/SQLite INSERT statement
652    fn check_insert_statement(
653        &mut self,
654        stmt: &[u8],
655        table_id: TableId,
656        table_name: &str,
657        stmt_count: u64,
658    ) {
659        let table_schema = match &self.schema {
660            Some(s) => match s.table(table_id) {
661                Some(ts) => ts,
662                None => return,
663            },
664            None => return,
665        };
666
667        // Parse rows from INSERT using the schema (works for MySQL and SQLite)
668        let rows = match mysql_insert::parse_mysql_insert_rows(stmt, table_schema) {
669            Ok(r) => r,
670            Err(_) => return,
671        };
672
673        for row in rows {
674            self.check_mysql_row(table_id, table_name, &row, stmt_count);
675        }
676    }
677
678    /// Check rows from a PostgreSQL COPY statement
679    fn check_copy_statement(
680        &mut self,
681        stmt: &[u8],
682        table_id: TableId,
683        table_name: &str,
684        stmt_count: u64,
685    ) {
686        // Find the COPY header line and the data section
687        let stmt_str = match std::str::from_utf8(stmt) {
688            Ok(s) => s,
689            Err(_) => return,
690        };
691
692        // Find the data section (after the header line ending with "FROM stdin;")
693        let data_start = if let Some(pos) = stmt_str.find("FROM stdin;") {
694            pos + "FROM stdin;".len()
695        } else if let Some(pos) = stmt_str.find("from stdin;") {
696            pos + "from stdin;".len()
697        } else {
698            return;
699        };
700
701        // Skip any whitespace/newlines after the header
702        let data_section = stmt_str[data_start..].trim_start();
703        if data_section.is_empty() {
704            return;
705        }
706
707        // Parse column list from the header
708        let header = &stmt_str[..data_start];
709        let column_order = postgres_copy::parse_copy_columns(header);
710
711        // Get table schema
712        let table_schema = match &self.schema {
713            Some(s) => match s.table(table_id) {
714                Some(ts) => ts,
715                None => return,
716            },
717            None => return,
718        };
719
720        // Parse the COPY data rows
721        let rows = match postgres_copy::parse_postgres_copy_rows(
722            data_section.as_bytes(),
723            table_schema,
724            column_order,
725        ) {
726            Ok(r) => r,
727            Err(_) => return,
728        };
729
730        for row in rows {
731            self.check_copy_row(table_id, table_name, &row, stmt_count);
732        }
733    }
734
735    /// Check rows from PostgreSQL COPY data (separate statement from header)
736    fn check_copy_data(
737        &mut self,
738        data_stmt: &[u8],
739        table_id: TableId,
740        table_name: &str,
741        column_order: Vec<String>,
742        stmt_count: u64,
743    ) {
744        // The data_stmt contains the raw COPY data lines (may have leading newline)
745        // Strip leading whitespace/newlines
746        let data: Vec<u8> = data_stmt
747            .iter()
748            .skip_while(|&&b| b == b'\n' || b == b'\r' || b == b' ' || b == b'\t')
749            .cloned()
750            .collect();
751
752        if data.is_empty() {
753            return;
754        }
755
756        // Get table schema
757        let table_schema = match &self.schema {
758            Some(s) => match s.table(table_id) {
759                Some(ts) => ts,
760                None => return,
761            },
762            None => return,
763        };
764
765        // Parse the COPY data rows
766        let rows = match postgres_copy::parse_postgres_copy_rows(&data, table_schema, column_order)
767        {
768            Ok(r) => r,
769            Err(_) => return,
770        };
771
772        for row in rows {
773            self.check_copy_row(table_id, table_name, &row, stmt_count);
774        }
775    }
776
777    /// Check a row from MySQL INSERT or SQLite INSERT
778    fn check_mysql_row(
779        &mut self,
780        table_id: TableId,
781        table_name: &str,
782        row: &mysql_insert::ParsedRow,
783        stmt_idx: u64,
784    ) {
785        self.check_row_common(
786            table_id,
787            table_name,
788            row.pk.as_ref(),
789            &row.fk_values,
790            stmt_idx,
791        );
792    }
793
794    /// Check a row from PostgreSQL COPY
795    fn check_copy_row(
796        &mut self,
797        table_id: TableId,
798        table_name: &str,
799        row: &postgres_copy::ParsedCopyRow,
800        stmt_idx: u64,
801    ) {
802        self.check_row_common(
803            table_id,
804            table_name,
805            row.pk.as_ref(),
806            &row.fk_values,
807            stmt_idx,
808        );
809    }
810
811    /// Common row checking logic for all dialects
812    fn check_row_common(
813        &mut self,
814        table_id: TableId,
815        table_name: &str,
816        pk: Option<&smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
817        fk_values: &[(
818            mysql_insert::FkRef,
819            smallvec::SmallVec<[mysql_insert::PkValue; 2]>,
820        )],
821        stmt_idx: u64,
822    ) {
823        // Skip if memory budget exceeded
824        if self.pk_fk_checks_disabled_due_to_memory {
825            return;
826        }
827
828        let max_rows = self.options.max_rows_per_table as u64;
829
830        let state = match self.table_states.get_mut(&table_id) {
831            Some(s) => s,
832            None => return,
833        };
834
835        state.row_count += 1;
836
837        // Check if we've exceeded max rows for this table
838        if state.row_count > max_rows {
839            if state.pk_values.is_some() {
840                state.pk_values = None;
841                self.add_issue(
842                    ValidationIssue::warning(
843                        "PK_CHECK_SKIPPED",
844                        format!(
845                            "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
846                            table_name, max_rows
847                        ),
848                    )
849                    .with_location(Location::new().with_table(table_name)),
850                );
851            }
852            return;
853        }
854
855        // PK duplicate check using hash-based storage (8 bytes per key instead of full values)
856        if let Some(pk_values) = pk {
857            if let Some(ref mut pk_set) = state.pk_values {
858                let pk_hash = hash_pk_values(pk_values);
859
860                if pk_set.insert(pk_hash) {
861                    // Only count unique keys
862                    self.tracked_pk_count += 1;
863                    self.enforce_pk_fk_memory_budget();
864                } else {
865                    // Duplicate detected
866                    state.pk_duplicates += 1;
867
868                    // Build human-readable display on demand (duplicates are rare)
869                    let pk_display: String = pk_values
870                        .iter()
871                        .map(|v| match v {
872                            mysql_insert::PkValue::Int(i) => i.to_string(),
873                            mysql_insert::PkValue::BigInt(i) => i.to_string(),
874                            mysql_insert::PkValue::Text(s) => s.to_string(),
875                            mysql_insert::PkValue::Null => "NULL".to_string(),
876                        })
877                        .collect::<Vec<_>>()
878                        .join(", ");
879
880                    self.add_issue(
881                        ValidationIssue::error(
882                            "DUPLICATE_PK",
883                            format!(
884                                "Duplicate primary key in table '{}': ({})",
885                                table_name, pk_display
886                            ),
887                        )
888                        .with_location(
889                            Location::new()
890                                .with_table(table_name)
891                                .with_statement(stmt_idx),
892                        ),
893                    );
894                }
895            }
896        }
897
898        // Skip FK collection if checks are disabled
899        if self.pk_fk_checks_disabled_due_to_memory {
900            return;
901        }
902
903        // Collect FK references for deferred validation (after all PKs are loaded)
904        // First, gather the FK checks into a temp vec to avoid borrow issues
905        let new_fk_checks: Vec<PendingFkCheck> = {
906            let schema = match &self.schema {
907                Some(s) => s,
908                None => return,
909            };
910
911            let table_schema = match schema.table(table_id) {
912                Some(t) => t,
913                None => return,
914            };
915
916            fk_values
917                .iter()
918                .filter_map(|(fk_ref, fk_vals)| {
919                    // Skip if all FK values are NULL (nullable FK)
920                    if fk_vals.iter().all(|v| v.is_null()) {
921                        return None;
922                    }
923
924                    let fk_def = table_schema.foreign_keys.get(fk_ref.fk_index as usize)?;
925                    let parent_table_id = fk_def.referenced_table_id?;
926
927                    // Store only the hash, not full values - saves significant memory
928                    let fk_hash = hash_pk_values(fk_vals);
929
930                    Some(PendingFkCheck {
931                        child_table_id: table_id,
932                        parent_table_id,
933                        fk_hash,
934                        stmt_idx,
935                    })
936                })
937                .collect()
938        };
939
940        // Now add the FK checks and update memory tracking
941        let new_count = new_fk_checks.len();
942        self.pending_fk_checks.extend(new_fk_checks);
943        self.tracked_fk_count += new_count;
944
945        if new_count > 0 {
946            self.enforce_pk_fk_memory_budget();
947        }
948    }
949
950    /// Validate all collected FK references after all PKs are loaded
951    fn validate_pending_fk_checks(&mut self) {
952        for check in std::mem::take(&mut self.pending_fk_checks) {
953            let parent_has_pk = self
954                .table_states
955                .get(&check.parent_table_id)
956                .and_then(|s| s.pk_values.as_ref())
957                .is_some_and(|set| set.contains(&check.fk_hash));
958
959            if !parent_has_pk {
960                let state = match self.table_states.get_mut(&check.child_table_id) {
961                    Some(s) => s,
962                    None => continue,
963                };
964                state.fk_missing_parents += 1;
965
966                // Only add issue for first few violations per table
967                if state.fk_missing_parents <= 5 {
968                    // Derive table names from the schema (not stored per FK to save memory)
969                    let (child_name, parent_name) = if let Some(schema) = &self.schema {
970                        let child = schema
971                            .table(check.child_table_id)
972                            .map(|t| t.name.clone())
973                            .unwrap_or_else(|| "<unknown>".to_string());
974                        let parent = schema
975                            .table(check.parent_table_id)
976                            .map(|t| t.name.clone())
977                            .unwrap_or_else(|| "<unknown>".to_string());
978                        (child, parent)
979                    } else {
980                        ("<unknown>".to_string(), "<unknown>".to_string())
981                    };
982
983                    self.add_issue(
984                        ValidationIssue::error(
985                            "FK_MISSING_PARENT",
986                            format!(
987                                "FK violation in '{}': references missing row in '{}'",
988                                child_name, parent_name
989                            ),
990                        )
991                        .with_location(
992                            Location::new()
993                                .with_table(child_name)
994                                .with_statement(check.stmt_idx),
995                        ),
996                    );
997                }
998            }
999        }
1000    }
1001
1002    fn build_summary(&self) -> ValidationSummary {
1003        let errors = self
1004            .issues
1005            .iter()
1006            .filter(|i| matches!(i.severity, Severity::Error))
1007            .count();
1008        let warnings = self
1009            .issues
1010            .iter()
1011            .filter(|i| matches!(i.severity, Severity::Warning))
1012            .count();
1013        let info = self
1014            .issues
1015            .iter()
1016            .filter(|i| matches!(i.severity, Severity::Info))
1017            .count();
1018
1019        let syntax_status = if self.syntax_errors > 0 {
1020            CheckStatus::Failed(self.syntax_errors)
1021        } else {
1022            CheckStatus::Ok
1023        };
1024
1025        let encoding_status = if self.encoding_warnings > 0 {
1026            CheckStatus::Failed(self.encoding_warnings)
1027        } else {
1028            CheckStatus::Ok
1029        };
1030
1031        let ddl_dml_status = if self.ddl_dml_errors > 0 {
1032            CheckStatus::Failed(self.ddl_dml_errors)
1033        } else {
1034            CheckStatus::Ok
1035        };
1036
1037        let pk_status = if !self.options.fk_checks_enabled {
1038            CheckStatus::Skipped("--no-fk-checks".to_string())
1039        } else if self.pk_fk_checks_disabled_due_to_memory {
1040            CheckStatus::Skipped("memory limit exceeded".to_string())
1041        } else if self.pk_errors > 0 {
1042            CheckStatus::Failed(self.pk_errors)
1043        } else {
1044            CheckStatus::Ok
1045        };
1046
1047        let fk_status = if !self.options.fk_checks_enabled {
1048            CheckStatus::Skipped("--no-fk-checks".to_string())
1049        } else if self.pk_fk_checks_disabled_due_to_memory {
1050            CheckStatus::Skipped("memory limit exceeded".to_string())
1051        } else if self.fk_errors > 0 {
1052            CheckStatus::Failed(self.fk_errors)
1053        } else {
1054            CheckStatus::Ok
1055        };
1056
1057        ValidationSummary {
1058            dialect: self.dialect.to_string(),
1059            issues: self.issues.clone(),
1060            summary: SummaryStats {
1061                errors,
1062                warnings,
1063                info,
1064                tables_scanned: self.tables_from_ddl.len(),
1065                statements_scanned: self.statement_count,
1066            },
1067            checks: CheckResults {
1068                syntax: syntax_status,
1069                encoding: encoding_status,
1070                ddl_dml_consistency: ddl_dml_status,
1071                pk_duplicates: pk_status,
1072                fk_integrity: fk_status,
1073            },
1074        }
1075    }
1076}