Skip to main content

sql_splitter/validate/
mod.rs

1//! Validate module for SQL dump integrity checking.
2//!
3//! This module provides:
4//! - SQL syntax validation (via parser error detection)
5//! - DDL/DML consistency checks (INSERTs reference existing tables)
6//! - Duplicate primary key detection (all dialects)
7//! - FK referential integrity checking (all dialects)
8//! - Encoding validation (UTF-8)
9
10use crate::parser::{
11    determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
12};
13use crate::progress::ProgressReader;
14use crate::schema::{Schema, SchemaBuilder, TableId};
15use crate::splitter::Compression;
16use ahash::{AHashMap, AHashSet};
17use schemars::JsonSchema;
18use serde::Serialize;
19use std::fmt;
20use std::fs::File;
21use std::hash::{Hash, Hasher};
22use std::io::Read;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26/// Maximum number of issues to collect before stopping
27const MAX_ISSUES: usize = 1000;
28
29/// Issue severity level
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, JsonSchema)]
31#[serde(rename_all = "lowercase")]
32pub enum Severity {
33    Error,
34    Warning,
35    Info,
36}
37
38impl fmt::Display for Severity {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            Severity::Error => write!(f, "ERROR"),
42            Severity::Warning => write!(f, "WARNING"),
43            Severity::Info => write!(f, "INFO"),
44        }
45    }
46}
47
48/// Location in the SQL dump where an issue was found
49#[derive(Debug, Clone, Serialize, JsonSchema)]
50pub struct Location {
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub table: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub statement_index: Option<u64>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub approx_line: Option<u64>,
57}
58
59impl Location {
60    pub fn new() -> Self {
61        Self {
62            table: None,
63            statement_index: None,
64            approx_line: None,
65        }
66    }
67
68    pub fn with_table(mut self, table: impl Into<String>) -> Self {
69        self.table = Some(table.into());
70        self
71    }
72
73    pub fn with_statement(mut self, index: u64) -> Self {
74        self.statement_index = Some(index);
75        self
76    }
77
78    #[allow(dead_code)]
79    pub fn with_line(mut self, line: u64) -> Self {
80        self.approx_line = Some(line);
81        self
82    }
83}
84
85impl Default for Location {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91/// A validation issue found in the SQL dump
92#[derive(Debug, Clone, Serialize, JsonSchema)]
93pub struct ValidationIssue {
94    pub code: &'static str,
95    pub severity: Severity,
96    pub message: String,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub location: Option<Location>,
99}
100
101impl ValidationIssue {
102    pub fn error(code: &'static str, message: impl Into<String>) -> Self {
103        Self {
104            code,
105            severity: Severity::Error,
106            message: message.into(),
107            location: None,
108        }
109    }
110
111    pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
112        Self {
113            code,
114            severity: Severity::Warning,
115            message: message.into(),
116            location: None,
117        }
118    }
119
120    pub fn info(code: &'static str, message: impl Into<String>) -> Self {
121        Self {
122            code,
123            severity: Severity::Info,
124            message: message.into(),
125            location: None,
126        }
127    }
128
129    pub fn with_location(mut self, location: Location) -> Self {
130        self.location = Some(location);
131        self
132    }
133}
134
135impl fmt::Display for ValidationIssue {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        write!(f, "{} [{}]", self.severity, self.code)?;
138        if let Some(ref loc) = self.location {
139            if let Some(ref table) = loc.table {
140                write!(f, " table={}", table)?;
141            }
142            if let Some(stmt) = loc.statement_index {
143                write!(f, " stmt={}", stmt)?;
144            }
145            if let Some(line) = loc.approx_line {
146                write!(f, " line~{}", line)?;
147            }
148        }
149        write!(f, ": {}", self.message)
150    }
151}
152
153/// Validation options
154#[derive(Debug, Clone)]
155pub struct ValidateOptions {
156    pub path: PathBuf,
157    pub dialect: Option<SqlDialect>,
158    pub progress: bool,
159    pub strict: bool,
160    pub json: bool,
161    pub max_rows_per_table: usize,
162    pub fk_checks_enabled: bool,
163    /// Optional global cap on tracked PK/FK keys for memory safety.
164    /// When exceeded, PK/FK checks are skipped for the remainder of the run.
165    /// If None, no limit is enforced (default).
166    pub max_pk_fk_keys: Option<usize>,
167}
168
169/// Validation summary with collected issues
170#[derive(Debug, Serialize, JsonSchema)]
171pub struct ValidationSummary {
172    pub dialect: String,
173    pub issues: Vec<ValidationIssue>,
174    pub summary: SummaryStats,
175    pub checks: CheckResults,
176}
177
178#[derive(Debug, Serialize, JsonSchema)]
179pub struct SummaryStats {
180    pub errors: usize,
181    pub warnings: usize,
182    pub info: usize,
183    pub tables_scanned: usize,
184    pub statements_scanned: u64,
185}
186
187#[derive(Debug, Serialize, JsonSchema)]
188pub struct CheckResults {
189    pub syntax: CheckStatus,
190    pub encoding: CheckStatus,
191    pub ddl_dml_consistency: CheckStatus,
192    pub pk_duplicates: CheckStatus,
193    pub fk_integrity: CheckStatus,
194}
195
196#[derive(Debug, Serialize, JsonSchema)]
197#[serde(rename_all = "lowercase")]
198pub enum CheckStatus {
199    Ok,
200    Failed(usize),
201    Skipped(String),
202}
203
204impl fmt::Display for CheckStatus {
205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206        match self {
207            CheckStatus::Ok => write!(f, "OK"),
208            CheckStatus::Failed(n) => write!(f, "{} issues", n),
209            CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
210        }
211    }
212}
213
214impl ValidationSummary {
215    pub fn has_errors(&self) -> bool {
216        self.summary.errors > 0
217    }
218
219    pub fn has_warnings(&self) -> bool {
220        self.summary.warnings > 0
221    }
222}
223
224/// Compact primary/foreign key representation for duplicate and FK checks.
225/// We use a 64-bit hash; collision risk is negligible for realistic dumps.
226type PkHash = u64;
227
228/// Hash a list of PK/FK values into a compact 64-bit hash.
229/// Uses AHash for fast, high-quality hashing.
230fn hash_pk_values(values: &smallvec::SmallVec<[mysql_insert::PkValue; 2]>) -> PkHash {
231    let mut hasher = ahash::AHasher::default();
232
233    // Include arity (number of columns) in the hash to distinguish (1) from (1, NULL)
234    (values.len() as u8).hash(&mut hasher);
235
236    for v in values {
237        match v {
238            mysql_insert::PkValue::Int(i) => {
239                0u8.hash(&mut hasher);
240                i.hash(&mut hasher);
241            }
242            mysql_insert::PkValue::BigInt(i) => {
243                1u8.hash(&mut hasher);
244                i.hash(&mut hasher);
245            }
246            mysql_insert::PkValue::Text(s) => {
247                2u8.hash(&mut hasher);
248                s.hash(&mut hasher);
249            }
250            mysql_insert::PkValue::Null => {
251                3u8.hash(&mut hasher);
252            }
253        }
254    }
255
256    hasher.finish()
257}
258
259/// Pending FK check to be validated after all PKs are loaded.
260/// Uses compact hash representation to minimize memory usage.
261struct PendingFkCheck {
262    child_table_id: TableId,
263    parent_table_id: TableId,
264    fk_hash: PkHash,
265    stmt_idx: u64,
266}
267
268/// Per-table tracking state for data checks.
269/// Uses hashed PK values to minimize memory usage.
270struct TableState {
271    row_count: u64,
272    /// Set of hashed PKs for duplicate and FK parent existence checks.
273    /// When None, PK/FK checks for this table are skipped (due to row or memory limits).
274    pk_values: Option<AHashSet<PkHash>>,
275    pk_column_indices: Vec<usize>,
276    pk_duplicates: u64,
277    fk_missing_parents: u64,
278}
279
280impl TableState {
281    fn new() -> Self {
282        Self {
283            row_count: 0,
284            pk_values: Some(AHashSet::new()),
285            pk_column_indices: Vec::new(),
286            pk_duplicates: 0,
287            fk_missing_parents: 0,
288        }
289    }
290
291    fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
292        self.pk_column_indices = indices;
293        self
294    }
295}
296
297/// SQL dump validator
298pub struct Validator {
299    options: ValidateOptions,
300    issues: Vec<ValidationIssue>,
301    dialect: SqlDialect,
302
303    // DDL/DML tracking
304    tables_from_ddl: AHashSet<String>,
305    tables_from_dml: Vec<(String, u64)>, // (table_name, statement_index)
306
307    // Schema for MySQL PK/FK checks
308    schema_builder: SchemaBuilder,
309    schema: Option<Schema>,
310
311    // Per-table state for data checks
312    table_states: AHashMap<TableId, TableState>,
313
314    // Pending FK checks (deferred until all PKs are loaded)
315    pending_fk_checks: Vec<PendingFkCheck>,
316
317    // Progress callback for byte-based progress tracking (Arc for reuse across passes)
318    progress_fn: Option<Arc<dyn Fn(u64) + Send + Sync>>,
319
320    // Counters
321    statement_count: u64,
322    syntax_errors: usize,
323    encoding_warnings: usize,
324    ddl_dml_errors: usize,
325    pk_errors: usize,
326    fk_errors: usize,
327
328    // Memory tracking for PK/FK checks
329    tracked_pk_count: usize,
330    tracked_fk_count: usize,
331    pk_fk_checks_disabled_due_to_memory: bool,
332
333    // PostgreSQL COPY context: (table_name, column_order, table_id)
334    current_copy_context: Option<(String, Vec<String>, TableId)>,
335}
336
337impl Validator {
338    pub fn new(options: ValidateOptions) -> Self {
339        Self {
340            dialect: options.dialect.unwrap_or(SqlDialect::MySql),
341            options,
342            issues: Vec::new(),
343            tables_from_ddl: AHashSet::new(),
344            tables_from_dml: Vec::new(),
345            schema_builder: SchemaBuilder::new(),
346            schema: None,
347            table_states: AHashMap::new(),
348            pending_fk_checks: Vec::new(),
349            progress_fn: None,
350            statement_count: 0,
351            syntax_errors: 0,
352            encoding_warnings: 0,
353            ddl_dml_errors: 0,
354            pk_errors: 0,
355            fk_errors: 0,
356            tracked_pk_count: 0,
357            tracked_fk_count: 0,
358            pk_fk_checks_disabled_due_to_memory: false,
359            current_copy_context: None,
360        }
361    }
362
363    /// Set a progress callback for byte-based progress tracking.
364    /// The callback receives cumulative bytes read across both validation passes.
365    pub fn with_progress<F>(mut self, f: F) -> Self
366    where
367        F: Fn(u64) + Send + Sync + 'static,
368    {
369        self.progress_fn = Some(Arc::new(f));
370        self
371    }
372
373    fn add_issue(&mut self, issue: ValidationIssue) {
374        if self.issues.len() >= MAX_ISSUES {
375            return;
376        }
377
378        match issue.severity {
379            Severity::Error => match issue.code {
380                "SYNTAX" => self.syntax_errors += 1,
381                "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
382                "DUPLICATE_PK" => self.pk_errors += 1,
383                "FK_MISSING_PARENT" => self.fk_errors += 1,
384                _ => {}
385            },
386            Severity::Warning => {
387                if issue.code == "ENCODING" {
388                    self.encoding_warnings += 1;
389                }
390            }
391            Severity::Info => {}
392        }
393
394        self.issues.push(issue);
395    }
396
397    /// Check if we've exceeded the memory budget for PK/FK tracking.
398    /// If so, disable further checks and free existing state.
399    fn enforce_pk_fk_memory_budget(&mut self) {
400        if self.pk_fk_checks_disabled_due_to_memory {
401            return;
402        }
403
404        let Some(limit) = self.options.max_pk_fk_keys else {
405            return;
406        };
407
408        let total_tracked = self.tracked_pk_count + self.tracked_fk_count;
409        if total_tracked > limit {
410            self.pk_fk_checks_disabled_due_to_memory = true;
411
412            // Drop existing state to free memory
413            for state in self.table_states.values_mut() {
414                state.pk_values = None;
415            }
416            self.pending_fk_checks.clear();
417            self.pending_fk_checks.shrink_to_fit();
418
419            self.add_issue(ValidationIssue::warning(
420                "PK_FK_CHECKS_SKIPPED_MEMORY",
421                format!(
422                    "Skipping PK/FK checks after tracking {} keys (memory limit of {} exceeded)",
423                    total_tracked, limit
424                ),
425            ));
426        }
427    }
428
429    pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
430        let file = File::open(&self.options.path)?;
431        let file_size = file.metadata()?.len();
432        let buffer_size = determine_buffer_size(file_size);
433
434        // Pass 1 reports bytes as 0 to file_size/2 (first half of progress bar)
435        let compression = Compression::from_path(&self.options.path);
436        let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
437            let cb = Arc::clone(cb);
438            let progress_reader = ProgressReader::new(file, move |bytes| {
439                // Scale to first half: 0% to 50%
440                cb(bytes / 2)
441            });
442            compression.wrap_reader(Box::new(progress_reader))?
443        } else {
444            compression.wrap_reader(Box::new(file))?
445        };
446
447        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
448
449        // Pass 1: Build schema and check DDL/DML consistency
450        loop {
451            match parser.read_statement() {
452                Ok(Some(stmt)) => {
453                    self.statement_count += 1;
454                    self.process_statement(&stmt);
455                }
456                Ok(None) => break,
457                Err(e) => {
458                    self.add_issue(
459                        ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
460                            .with_location(
461                                Location::new().with_statement(self.statement_count + 1),
462                            ),
463                    );
464                    break;
465                }
466            }
467        }
468
469        // Check for DML referencing missing tables - collect issues first, then add them
470        let missing_table_issues: Vec<_> = self
471            .tables_from_dml
472            .iter()
473            .filter(|(table, _)| {
474                let table_lower = table.to_lowercase();
475                !self
476                    .tables_from_ddl
477                    .iter()
478                    .any(|t| t.to_lowercase() == table_lower)
479            })
480            .map(|(table, stmt_idx)| {
481                ValidationIssue::error(
482                    "DDL_MISSING_TABLE",
483                    format!(
484                        "INSERT/COPY references table '{}' with no CREATE TABLE",
485                        table
486                    ),
487                )
488                .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
489            })
490            .collect();
491
492        for issue in missing_table_issues {
493            self.add_issue(issue);
494        }
495
496        // Finalize schema and resolve FK references for data checks (all dialects)
497        if self.options.fk_checks_enabled {
498            self.schema = Some(self.schema_builder.build());
499            self.schema_builder = SchemaBuilder::new(); // Reset to avoid double use
500            self.initialize_table_states();
501        }
502
503        // Pass 2: Data checks (PK + collect FK refs) - requires re-reading the file
504        let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
505        if self.options.fk_checks_enabled && schema_not_empty {
506            self.run_data_checks()?;
507            // Now that all PKs are loaded, validate the collected FK references
508            self.validate_pending_fk_checks();
509        }
510
511        Ok(self.build_summary())
512    }
513
514    fn process_statement(&mut self, stmt: &[u8]) {
515        // Check encoding
516        if std::str::from_utf8(stmt).is_err() {
517            self.add_issue(
518                ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
519                    .with_location(Location::new().with_statement(self.statement_count)),
520            );
521        }
522
523        let (stmt_type, table_name) =
524            Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
525
526        match stmt_type {
527            StatementType::CreateTable => {
528                if !table_name.is_empty() {
529                    self.tables_from_ddl.insert(table_name.clone());
530
531                    // Parse CREATE TABLE for schema info (all dialects supported)
532                    if let Ok(stmt_str) = std::str::from_utf8(stmt) {
533                        self.schema_builder.parse_create_table(stmt_str);
534                    }
535                }
536            }
537            StatementType::AlterTable => {
538                // Parse ALTER TABLE for FK constraints (all dialects supported)
539                if let Ok(stmt_str) = std::str::from_utf8(stmt) {
540                    self.schema_builder.parse_alter_table(stmt_str);
541                }
542            }
543            StatementType::Insert | StatementType::Copy => {
544                if !table_name.is_empty() {
545                    self.tables_from_dml
546                        .push((table_name, self.statement_count));
547                }
548            }
549            StatementType::Unknown => {
550                // Could be a session command or comment - not an error
551            }
552            _ => {}
553        }
554    }
555
556    fn initialize_table_states(&mut self) {
557        let schema = match &self.schema {
558            Some(s) => s,
559            None => return,
560        };
561
562        for table_schema in schema.iter() {
563            let pk_indices: Vec<usize> = table_schema
564                .primary_key
565                .iter()
566                .map(|col_id| col_id.0 as usize)
567                .collect();
568
569            let state = TableState::new().with_pk_columns(pk_indices);
570            self.table_states.insert(table_schema.id, state);
571        }
572    }
573
574    fn run_data_checks(&mut self) -> anyhow::Result<()> {
575        let file = File::open(&self.options.path)?;
576        let file_size = file.metadata()?.len();
577        let buffer_size = determine_buffer_size(file_size);
578
579        // Pass 2 reports bytes as file_size/2 to file_size (second half of progress bar)
580        let compression = Compression::from_path(&self.options.path);
581        let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
582            let cb = Arc::clone(cb);
583            let progress_reader = ProgressReader::new(file, move |bytes| {
584                // Scale to second half: 50% to 100%
585                cb(file_size / 2 + bytes / 2)
586            });
587            compression.wrap_reader(Box::new(progress_reader))?
588        } else {
589            compression.wrap_reader(Box::new(file))?
590        };
591
592        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
593        let mut stmt_count: u64 = 0;
594
595        // Reset COPY context for this pass
596        self.current_copy_context = None;
597
598        while let Some(stmt) = parser.read_statement()? {
599            stmt_count += 1;
600
601            let (stmt_type, table_name) =
602                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
603
604            // Handle PostgreSQL COPY data (separate statement from header)
605            if self.dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
606                // Check if this looks like COPY data (ends with \.)
607                if stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n") {
608                    if let Some((ref copy_table, ref column_order, copy_table_id)) =
609                        self.current_copy_context.clone()
610                    {
611                        self.check_copy_data(
612                            &stmt,
613                            copy_table_id,
614                            copy_table,
615                            column_order.clone(),
616                            stmt_count,
617                        );
618                    }
619                }
620                self.current_copy_context = None;
621                continue;
622            }
623
624            // Get table_id without holding a borrow on self.schema
625            let table_id = match &self.schema {
626                Some(s) => match s.get_table_id(&table_name) {
627                    Some(id) => id,
628                    None => continue,
629                },
630                None => continue,
631            };
632
633            match stmt_type {
634                StatementType::Insert => {
635                    // MySQL and SQLite use INSERT VALUES syntax
636                    self.check_insert_statement(&stmt, table_id, &table_name, stmt_count);
637                }
638                StatementType::Copy => {
639                    // For PostgreSQL COPY, the data comes in the next statement
640                    // Save context for processing the data statement
641                    let header = String::from_utf8_lossy(&stmt);
642                    let column_order = postgres_copy::parse_copy_columns(&header);
643                    self.current_copy_context = Some((table_name.clone(), column_order, table_id));
644                }
645                _ => continue,
646            }
647        }
648
649        Ok(())
650    }
651
652    /// Check rows from a MySQL/SQLite INSERT statement
653    fn check_insert_statement(
654        &mut self,
655        stmt: &[u8],
656        table_id: TableId,
657        table_name: &str,
658        stmt_count: u64,
659    ) {
660        let table_schema = match &self.schema {
661            Some(s) => match s.table(table_id) {
662                Some(ts) => ts,
663                None => return,
664            },
665            None => return,
666        };
667
668        // Parse rows from INSERT using the schema (works for MySQL and SQLite)
669        let rows = match mysql_insert::parse_mysql_insert_rows(stmt, table_schema) {
670            Ok(r) => r,
671            Err(_) => return,
672        };
673
674        for row in rows {
675            self.check_mysql_row(table_id, table_name, &row, stmt_count);
676        }
677    }
678
679    /// Check rows from a PostgreSQL COPY statement
680    fn check_copy_statement(
681        &mut self,
682        stmt: &[u8],
683        table_id: TableId,
684        table_name: &str,
685        stmt_count: u64,
686    ) {
687        // Find the COPY header line and the data section
688        let stmt_str = match std::str::from_utf8(stmt) {
689            Ok(s) => s,
690            Err(_) => return,
691        };
692
693        // Find the data section (after the header line ending with "FROM stdin;")
694        let data_start = if let Some(pos) = stmt_str.find("FROM stdin;") {
695            pos + "FROM stdin;".len()
696        } else if let Some(pos) = stmt_str.find("from stdin;") {
697            pos + "from stdin;".len()
698        } else {
699            return;
700        };
701
702        // Skip any whitespace/newlines after the header
703        let data_section = stmt_str[data_start..].trim_start();
704        if data_section.is_empty() {
705            return;
706        }
707
708        // Parse column list from the header
709        let header = &stmt_str[..data_start];
710        let column_order = postgres_copy::parse_copy_columns(header);
711
712        // Get table schema
713        let table_schema = match &self.schema {
714            Some(s) => match s.table(table_id) {
715                Some(ts) => ts,
716                None => return,
717            },
718            None => return,
719        };
720
721        // Parse the COPY data rows
722        let rows = match postgres_copy::parse_postgres_copy_rows(
723            data_section.as_bytes(),
724            table_schema,
725            column_order,
726        ) {
727            Ok(r) => r,
728            Err(_) => return,
729        };
730
731        for row in rows {
732            self.check_copy_row(table_id, table_name, &row, stmt_count);
733        }
734    }
735
736    /// Check rows from PostgreSQL COPY data (separate statement from header)
737    fn check_copy_data(
738        &mut self,
739        data_stmt: &[u8],
740        table_id: TableId,
741        table_name: &str,
742        column_order: Vec<String>,
743        stmt_count: u64,
744    ) {
745        // The data_stmt contains the raw COPY data lines (may have leading newline)
746        // Strip leading whitespace/newlines
747        let data: Vec<u8> = data_stmt
748            .iter()
749            .skip_while(|&&b| b == b'\n' || b == b'\r' || b == b' ' || b == b'\t')
750            .cloned()
751            .collect();
752
753        if data.is_empty() {
754            return;
755        }
756
757        // Get table schema
758        let table_schema = match &self.schema {
759            Some(s) => match s.table(table_id) {
760                Some(ts) => ts,
761                None => return,
762            },
763            None => return,
764        };
765
766        // Parse the COPY data rows
767        let rows = match postgres_copy::parse_postgres_copy_rows(&data, table_schema, column_order)
768        {
769            Ok(r) => r,
770            Err(_) => return,
771        };
772
773        for row in rows {
774            self.check_copy_row(table_id, table_name, &row, stmt_count);
775        }
776    }
777
778    /// Check a row from MySQL INSERT or SQLite INSERT
779    fn check_mysql_row(
780        &mut self,
781        table_id: TableId,
782        table_name: &str,
783        row: &mysql_insert::ParsedRow,
784        stmt_idx: u64,
785    ) {
786        self.check_row_common(
787            table_id,
788            table_name,
789            row.pk.as_ref(),
790            &row.fk_values,
791            stmt_idx,
792        );
793    }
794
795    /// Check a row from PostgreSQL COPY
796    fn check_copy_row(
797        &mut self,
798        table_id: TableId,
799        table_name: &str,
800        row: &postgres_copy::ParsedCopyRow,
801        stmt_idx: u64,
802    ) {
803        self.check_row_common(
804            table_id,
805            table_name,
806            row.pk.as_ref(),
807            &row.fk_values,
808            stmt_idx,
809        );
810    }
811
812    /// Common row checking logic for all dialects
813    fn check_row_common(
814        &mut self,
815        table_id: TableId,
816        table_name: &str,
817        pk: Option<&smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
818        fk_values: &[(
819            mysql_insert::FkRef,
820            smallvec::SmallVec<[mysql_insert::PkValue; 2]>,
821        )],
822        stmt_idx: u64,
823    ) {
824        // Skip if memory budget exceeded
825        if self.pk_fk_checks_disabled_due_to_memory {
826            return;
827        }
828
829        let max_rows = self.options.max_rows_per_table as u64;
830
831        let state = match self.table_states.get_mut(&table_id) {
832            Some(s) => s,
833            None => return,
834        };
835
836        state.row_count += 1;
837
838        // Check if we've exceeded max rows for this table
839        if state.row_count > max_rows {
840            if state.pk_values.is_some() {
841                state.pk_values = None;
842                self.add_issue(
843                    ValidationIssue::warning(
844                        "PK_CHECK_SKIPPED",
845                        format!(
846                            "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
847                            table_name, max_rows
848                        ),
849                    )
850                    .with_location(Location::new().with_table(table_name)),
851                );
852            }
853            return;
854        }
855
856        // PK duplicate check using hash-based storage (8 bytes per key instead of full values)
857        if let Some(pk_values) = pk {
858            if let Some(ref mut pk_set) = state.pk_values {
859                let pk_hash = hash_pk_values(pk_values);
860
861                if pk_set.insert(pk_hash) {
862                    // Only count unique keys
863                    self.tracked_pk_count += 1;
864                    self.enforce_pk_fk_memory_budget();
865                } else {
866                    // Duplicate detected
867                    state.pk_duplicates += 1;
868
869                    // Build human-readable display on demand (duplicates are rare)
870                    let pk_display: String = pk_values
871                        .iter()
872                        .map(|v| match v {
873                            mysql_insert::PkValue::Int(i) => i.to_string(),
874                            mysql_insert::PkValue::BigInt(i) => i.to_string(),
875                            mysql_insert::PkValue::Text(s) => s.to_string(),
876                            mysql_insert::PkValue::Null => "NULL".to_string(),
877                        })
878                        .collect::<Vec<_>>()
879                        .join(", ");
880
881                    self.add_issue(
882                        ValidationIssue::error(
883                            "DUPLICATE_PK",
884                            format!(
885                                "Duplicate primary key in table '{}': ({})",
886                                table_name, pk_display
887                            ),
888                        )
889                        .with_location(
890                            Location::new()
891                                .with_table(table_name)
892                                .with_statement(stmt_idx),
893                        ),
894                    );
895                }
896            }
897        }
898
899        // Skip FK collection if checks are disabled
900        if self.pk_fk_checks_disabled_due_to_memory {
901            return;
902        }
903
904        // Collect FK references for deferred validation (after all PKs are loaded)
905        // First, gather the FK checks into a temp vec to avoid borrow issues
906        let new_fk_checks: Vec<PendingFkCheck> = {
907            let schema = match &self.schema {
908                Some(s) => s,
909                None => return,
910            };
911
912            let table_schema = match schema.table(table_id) {
913                Some(t) => t,
914                None => return,
915            };
916
917            fk_values
918                .iter()
919                .filter_map(|(fk_ref, fk_vals)| {
920                    // Skip if all FK values are NULL (nullable FK)
921                    if fk_vals.iter().all(|v| v.is_null()) {
922                        return None;
923                    }
924
925                    let fk_def = table_schema.foreign_keys.get(fk_ref.fk_index as usize)?;
926                    let parent_table_id = fk_def.referenced_table_id?;
927
928                    // Store only the hash, not full values - saves significant memory
929                    let fk_hash = hash_pk_values(fk_vals);
930
931                    Some(PendingFkCheck {
932                        child_table_id: table_id,
933                        parent_table_id,
934                        fk_hash,
935                        stmt_idx,
936                    })
937                })
938                .collect()
939        };
940
941        // Now add the FK checks and update memory tracking
942        let new_count = new_fk_checks.len();
943        self.pending_fk_checks.extend(new_fk_checks);
944        self.tracked_fk_count += new_count;
945
946        if new_count > 0 {
947            self.enforce_pk_fk_memory_budget();
948        }
949    }
950
951    /// Validate all collected FK references after all PKs are loaded
952    fn validate_pending_fk_checks(&mut self) {
953        for check in std::mem::take(&mut self.pending_fk_checks) {
954            let parent_has_pk = self
955                .table_states
956                .get(&check.parent_table_id)
957                .and_then(|s| s.pk_values.as_ref())
958                .is_some_and(|set| set.contains(&check.fk_hash));
959
960            if !parent_has_pk {
961                let state = match self.table_states.get_mut(&check.child_table_id) {
962                    Some(s) => s,
963                    None => continue,
964                };
965                state.fk_missing_parents += 1;
966
967                // Only add issue for first few violations per table
968                if state.fk_missing_parents <= 5 {
969                    // Derive table names from the schema (not stored per FK to save memory)
970                    let (child_name, parent_name) = if let Some(schema) = &self.schema {
971                        let child = schema
972                            .table(check.child_table_id)
973                            .map(|t| t.name.clone())
974                            .unwrap_or_else(|| "<unknown>".to_string());
975                        let parent = schema
976                            .table(check.parent_table_id)
977                            .map(|t| t.name.clone())
978                            .unwrap_or_else(|| "<unknown>".to_string());
979                        (child, parent)
980                    } else {
981                        ("<unknown>".to_string(), "<unknown>".to_string())
982                    };
983
984                    self.add_issue(
985                        ValidationIssue::error(
986                            "FK_MISSING_PARENT",
987                            format!(
988                                "FK violation in '{}': references missing row in '{}'",
989                                child_name, parent_name
990                            ),
991                        )
992                        .with_location(
993                            Location::new()
994                                .with_table(child_name)
995                                .with_statement(check.stmt_idx),
996                        ),
997                    );
998                }
999            }
1000        }
1001    }
1002
1003    fn build_summary(&self) -> ValidationSummary {
1004        let errors = self
1005            .issues
1006            .iter()
1007            .filter(|i| matches!(i.severity, Severity::Error))
1008            .count();
1009        let warnings = self
1010            .issues
1011            .iter()
1012            .filter(|i| matches!(i.severity, Severity::Warning))
1013            .count();
1014        let info = self
1015            .issues
1016            .iter()
1017            .filter(|i| matches!(i.severity, Severity::Info))
1018            .count();
1019
1020        let syntax_status = if self.syntax_errors > 0 {
1021            CheckStatus::Failed(self.syntax_errors)
1022        } else {
1023            CheckStatus::Ok
1024        };
1025
1026        let encoding_status = if self.encoding_warnings > 0 {
1027            CheckStatus::Failed(self.encoding_warnings)
1028        } else {
1029            CheckStatus::Ok
1030        };
1031
1032        let ddl_dml_status = if self.ddl_dml_errors > 0 {
1033            CheckStatus::Failed(self.ddl_dml_errors)
1034        } else {
1035            CheckStatus::Ok
1036        };
1037
1038        let pk_status = if !self.options.fk_checks_enabled {
1039            CheckStatus::Skipped("--no-fk-checks".to_string())
1040        } else if self.pk_fk_checks_disabled_due_to_memory {
1041            CheckStatus::Skipped("memory limit exceeded".to_string())
1042        } else if self.pk_errors > 0 {
1043            CheckStatus::Failed(self.pk_errors)
1044        } else {
1045            CheckStatus::Ok
1046        };
1047
1048        let fk_status = if !self.options.fk_checks_enabled {
1049            CheckStatus::Skipped("--no-fk-checks".to_string())
1050        } else if self.pk_fk_checks_disabled_due_to_memory {
1051            CheckStatus::Skipped("memory limit exceeded".to_string())
1052        } else if self.fk_errors > 0 {
1053            CheckStatus::Failed(self.fk_errors)
1054        } else {
1055            CheckStatus::Ok
1056        };
1057
1058        ValidationSummary {
1059            dialect: self.dialect.to_string(),
1060            issues: self.issues.clone(),
1061            summary: SummaryStats {
1062                errors,
1063                warnings,
1064                info,
1065                tables_scanned: self.tables_from_ddl.len(),
1066                statements_scanned: self.statement_count,
1067            },
1068            checks: CheckResults {
1069                syntax: syntax_status,
1070                encoding: encoding_status,
1071                ddl_dml_consistency: ddl_dml_status,
1072                pk_duplicates: pk_status,
1073                fk_integrity: fk_status,
1074            },
1075        }
1076    }
1077}