sql_splitter/validate/
mod.rs

1//! Validate module for SQL dump integrity checking.
2//!
3//! This module provides:
4//! - SQL syntax validation (via parser error detection)
5//! - DDL/DML consistency checks (INSERTs reference existing tables)
6//! - Duplicate primary key detection (MySQL only)
7//! - FK referential integrity checking (MySQL only)
8//! - Encoding validation (UTF-8)
9
10use crate::parser::{determine_buffer_size, mysql_insert, Parser, SqlDialect, StatementType};
11use crate::schema::{Schema, SchemaBuilder, TableId};
12use crate::splitter::Compression;
13use ahash::{AHashMap, AHashSet};
14use serde::Serialize;
15use std::fmt;
16use std::fs::File;
17use std::io::Read;
18use std::path::PathBuf;
19
20/// Maximum number of issues to collect before stopping
21const MAX_ISSUES: usize = 1000;
22
23/// Issue severity level
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "lowercase")]
26pub enum Severity {
27    Error,
28    Warning,
29    Info,
30}
31
32impl fmt::Display for Severity {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        match self {
35            Severity::Error => write!(f, "ERROR"),
36            Severity::Warning => write!(f, "WARNING"),
37            Severity::Info => write!(f, "INFO"),
38        }
39    }
40}
41
42/// Location in the SQL dump where an issue was found
43#[derive(Debug, Clone, Serialize)]
44pub struct Location {
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub table: Option<String>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub statement_index: Option<u64>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub approx_line: Option<u64>,
51}
52
53impl Location {
54    pub fn new() -> Self {
55        Self {
56            table: None,
57            statement_index: None,
58            approx_line: None,
59        }
60    }
61
62    pub fn with_table(mut self, table: impl Into<String>) -> Self {
63        self.table = Some(table.into());
64        self
65    }
66
67    pub fn with_statement(mut self, index: u64) -> Self {
68        self.statement_index = Some(index);
69        self
70    }
71
72    #[allow(dead_code)]
73    pub fn with_line(mut self, line: u64) -> Self {
74        self.approx_line = Some(line);
75        self
76    }
77}
78
79impl Default for Location {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85/// A validation issue found in the SQL dump
86#[derive(Debug, Clone, Serialize)]
87pub struct ValidationIssue {
88    pub code: &'static str,
89    pub severity: Severity,
90    pub message: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub location: Option<Location>,
93}
94
95impl ValidationIssue {
96    pub fn error(code: &'static str, message: impl Into<String>) -> Self {
97        Self {
98            code,
99            severity: Severity::Error,
100            message: message.into(),
101            location: None,
102        }
103    }
104
105    pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
106        Self {
107            code,
108            severity: Severity::Warning,
109            message: message.into(),
110            location: None,
111        }
112    }
113
114    pub fn info(code: &'static str, message: impl Into<String>) -> Self {
115        Self {
116            code,
117            severity: Severity::Info,
118            message: message.into(),
119            location: None,
120        }
121    }
122
123    pub fn with_location(mut self, location: Location) -> Self {
124        self.location = Some(location);
125        self
126    }
127}
128
129impl fmt::Display for ValidationIssue {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        write!(f, "{} [{}]", self.severity, self.code)?;
132        if let Some(ref loc) = self.location {
133            if let Some(ref table) = loc.table {
134                write!(f, " table={}", table)?;
135            }
136            if let Some(stmt) = loc.statement_index {
137                write!(f, " stmt={}", stmt)?;
138            }
139            if let Some(line) = loc.approx_line {
140                write!(f, " line~{}", line)?;
141            }
142        }
143        write!(f, ": {}", self.message)
144    }
145}
146
147/// Validation options
148#[derive(Debug, Clone)]
149pub struct ValidateOptions {
150    pub path: PathBuf,
151    pub dialect: Option<SqlDialect>,
152    pub progress: bool,
153    pub strict: bool,
154    pub json: bool,
155    pub max_rows_per_table: usize,
156    pub fk_checks_enabled: bool,
157}
158
159/// Validation summary with collected issues
160#[derive(Debug, Serialize)]
161pub struct ValidationSummary {
162    pub dialect: String,
163    pub issues: Vec<ValidationIssue>,
164    pub summary: SummaryStats,
165    pub checks: CheckResults,
166}
167
168#[derive(Debug, Serialize)]
169pub struct SummaryStats {
170    pub errors: usize,
171    pub warnings: usize,
172    pub info: usize,
173    pub tables_scanned: usize,
174    pub statements_scanned: u64,
175}
176
177#[derive(Debug, Serialize)]
178pub struct CheckResults {
179    pub syntax: CheckStatus,
180    pub encoding: CheckStatus,
181    pub ddl_dml_consistency: CheckStatus,
182    pub pk_duplicates: CheckStatus,
183    pub fk_integrity: CheckStatus,
184}
185
186#[derive(Debug, Serialize)]
187#[serde(rename_all = "lowercase")]
188pub enum CheckStatus {
189    Ok,
190    Failed(usize),
191    Skipped(String),
192}
193
194impl fmt::Display for CheckStatus {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        match self {
197            CheckStatus::Ok => write!(f, "OK"),
198            CheckStatus::Failed(n) => write!(f, "{} issues", n),
199            CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
200        }
201    }
202}
203
204impl ValidationSummary {
205    pub fn has_errors(&self) -> bool {
206        self.summary.errors > 0
207    }
208
209    pub fn has_warnings(&self) -> bool {
210        self.summary.warnings > 0
211    }
212}
213
214/// Primary key tuple for duplicate detection
215type PkTuple = Vec<Vec<u8>>;
216
217/// Per-table tracking state for data checks
218struct TableState {
219    row_count: u64,
220    pk_values: Option<AHashSet<PkTuple>>,
221    pk_column_indices: Vec<usize>,
222    pk_duplicates: u64,
223    fk_missing_parents: u64,
224}
225
226impl TableState {
227    fn new() -> Self {
228        Self {
229            row_count: 0,
230            pk_values: Some(AHashSet::new()),
231            pk_column_indices: Vec::new(),
232            pk_duplicates: 0,
233            fk_missing_parents: 0,
234        }
235    }
236
237    fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
238        self.pk_column_indices = indices;
239        self
240    }
241}
242
243/// SQL dump validator
244pub struct Validator {
245    options: ValidateOptions,
246    issues: Vec<ValidationIssue>,
247    dialect: SqlDialect,
248
249    // DDL/DML tracking
250    tables_from_ddl: AHashSet<String>,
251    tables_from_dml: Vec<(String, u64)>, // (table_name, statement_index)
252
253    // Schema for MySQL PK/FK checks
254    schema_builder: SchemaBuilder,
255    schema: Option<Schema>,
256
257    // Per-table state for data checks
258    table_states: AHashMap<TableId, TableState>,
259
260    // Counters
261    statement_count: u64,
262    syntax_errors: usize,
263    encoding_warnings: usize,
264    ddl_dml_errors: usize,
265    pk_errors: usize,
266    fk_errors: usize,
267}
268
269impl Validator {
270    pub fn new(options: ValidateOptions) -> Self {
271        Self {
272            dialect: options.dialect.unwrap_or(SqlDialect::MySql),
273            options,
274            issues: Vec::new(),
275            tables_from_ddl: AHashSet::new(),
276            tables_from_dml: Vec::new(),
277            schema_builder: SchemaBuilder::new(),
278            schema: None,
279            table_states: AHashMap::new(),
280            statement_count: 0,
281            syntax_errors: 0,
282            encoding_warnings: 0,
283            ddl_dml_errors: 0,
284            pk_errors: 0,
285            fk_errors: 0,
286        }
287    }
288
289    fn add_issue(&mut self, issue: ValidationIssue) {
290        if self.issues.len() >= MAX_ISSUES {
291            return;
292        }
293
294        match issue.severity {
295            Severity::Error => match issue.code {
296                "SYNTAX" => self.syntax_errors += 1,
297                "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
298                "DUPLICATE_PK" => self.pk_errors += 1,
299                "FK_MISSING_PARENT" => self.fk_errors += 1,
300                _ => {}
301            },
302            Severity::Warning => {
303                if issue.code == "ENCODING" {
304                    self.encoding_warnings += 1;
305                }
306            }
307            Severity::Info => {}
308        }
309
310        self.issues.push(issue);
311    }
312
313    pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
314        let file = File::open(&self.options.path)?;
315        let file_size = file.metadata()?.len();
316        let buffer_size = determine_buffer_size(file_size);
317
318        let compression = Compression::from_path(&self.options.path);
319        let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file));
320
321        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
322
323        // Pass 1: Build schema and check DDL/DML consistency
324        loop {
325            match parser.read_statement() {
326                Ok(Some(stmt)) => {
327                    self.statement_count += 1;
328                    self.process_statement(&stmt);
329                }
330                Ok(None) => break,
331                Err(e) => {
332                    self.add_issue(
333                        ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
334                            .with_location(
335                                Location::new().with_statement(self.statement_count + 1),
336                            ),
337                    );
338                    break;
339                }
340            }
341        }
342
343        // Check for DML referencing missing tables - collect issues first, then add them
344        let missing_table_issues: Vec<_> = self
345            .tables_from_dml
346            .iter()
347            .filter(|(table, _)| {
348                let table_lower = table.to_lowercase();
349                !self
350                    .tables_from_ddl
351                    .iter()
352                    .any(|t| t.to_lowercase() == table_lower)
353            })
354            .map(|(table, stmt_idx)| {
355                ValidationIssue::error(
356                    "DDL_MISSING_TABLE",
357                    format!(
358                        "INSERT/COPY references table '{}' with no CREATE TABLE",
359                        table
360                    ),
361                )
362                .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
363            })
364            .collect();
365
366        for issue in missing_table_issues {
367            self.add_issue(issue);
368        }
369
370        // Finalize schema and resolve FK references for data checks
371        if self.dialect == SqlDialect::MySql && self.options.fk_checks_enabled {
372            self.schema = Some(self.schema_builder.build());
373            self.schema_builder = SchemaBuilder::new(); // Reset to avoid double use
374            self.initialize_table_states();
375        }
376
377        // Pass 2: Data checks (PK/FK) - requires re-reading the file
378        let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
379        if self.dialect == SqlDialect::MySql && self.options.fk_checks_enabled && schema_not_empty {
380            self.run_data_checks()?;
381        } else if self.dialect != SqlDialect::MySql && self.options.fk_checks_enabled {
382            self.add_issue(ValidationIssue::info(
383                "FK_CHECK_UNSUPPORTED",
384                format!(
385                    "PK/FK data integrity checks are only supported for MySQL dumps; skipping for {}",
386                    self.dialect
387                ),
388            ));
389        }
390
391        Ok(self.build_summary())
392    }
393
394    fn process_statement(&mut self, stmt: &[u8]) {
395        // Check encoding
396        if std::str::from_utf8(stmt).is_err() {
397            self.add_issue(
398                ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
399                    .with_location(Location::new().with_statement(self.statement_count)),
400            );
401        }
402
403        let (stmt_type, table_name) =
404            Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
405
406        match stmt_type {
407            StatementType::CreateTable => {
408                if !table_name.is_empty() {
409                    self.tables_from_ddl.insert(table_name.clone());
410
411                    // For MySQL, parse the CREATE TABLE for schema info
412                    if self.dialect == SqlDialect::MySql {
413                        if let Ok(stmt_str) = std::str::from_utf8(stmt) {
414                            self.schema_builder.parse_create_table(stmt_str);
415                        }
416                    }
417                }
418            }
419            StatementType::AlterTable => {
420                // For MySQL, parse ALTER TABLE for FK constraints
421                if self.dialect == SqlDialect::MySql {
422                    if let Ok(stmt_str) = std::str::from_utf8(stmt) {
423                        self.schema_builder.parse_alter_table(stmt_str);
424                    }
425                }
426            }
427            StatementType::Insert | StatementType::Copy => {
428                if !table_name.is_empty() {
429                    self.tables_from_dml
430                        .push((table_name, self.statement_count));
431                }
432            }
433            StatementType::Unknown => {
434                // Could be a session command or comment - not an error
435            }
436            _ => {}
437        }
438    }
439
440    fn initialize_table_states(&mut self) {
441        let schema = match &self.schema {
442            Some(s) => s,
443            None => return,
444        };
445
446        for table_schema in schema.iter() {
447            let pk_indices: Vec<usize> = table_schema
448                .primary_key
449                .iter()
450                .map(|col_id| col_id.0 as usize)
451                .collect();
452
453            let state = TableState::new().with_pk_columns(pk_indices);
454            self.table_states.insert(table_schema.id, state);
455        }
456    }
457
458    fn run_data_checks(&mut self) -> anyhow::Result<()> {
459        let file = File::open(&self.options.path)?;
460        let file_size = file.metadata()?.len();
461        let buffer_size = determine_buffer_size(file_size);
462
463        let compression = Compression::from_path(&self.options.path);
464        let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file));
465
466        let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
467        let mut stmt_count: u64 = 0;
468
469        while let Some(stmt) = parser.read_statement()? {
470            stmt_count += 1;
471
472            let (stmt_type, table_name) =
473                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
474
475            if stmt_type != StatementType::Insert {
476                continue;
477            }
478
479            let schema = match &self.schema {
480                Some(s) => s,
481                None => continue,
482            };
483
484            let table_id = match schema.get_table_id(&table_name) {
485                Some(id) => id,
486                None => continue,
487            };
488
489            let table_schema = match schema.table(table_id) {
490                Some(s) => s,
491                None => continue,
492            };
493
494            // Parse rows from INSERT using the schema
495            let rows = match mysql_insert::parse_mysql_insert_rows(&stmt, table_schema) {
496                Ok(r) => r,
497                Err(_) => continue,
498            };
499
500            for row in rows {
501                self.check_row(table_id, &table_name, &row, stmt_count);
502            }
503        }
504
505        Ok(())
506    }
507
508    fn check_row(
509        &mut self,
510        table_id: TableId,
511        table_name: &str,
512        row: &mysql_insert::ParsedRow,
513        stmt_idx: u64,
514    ) {
515        let max_rows = self.options.max_rows_per_table as u64;
516
517        let state = match self.table_states.get_mut(&table_id) {
518            Some(s) => s,
519            None => return,
520        };
521
522        state.row_count += 1;
523
524        // Check if we've exceeded max rows for this table
525        if state.row_count > max_rows {
526            if state.pk_values.is_some() {
527                state.pk_values = None;
528                self.add_issue(
529                    ValidationIssue::warning(
530                        "PK_CHECK_SKIPPED",
531                        format!(
532                            "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
533                            table_name, max_rows
534                        ),
535                    )
536                    .with_location(Location::new().with_table(table_name)),
537                );
538            }
539            return;
540        }
541
542        // PK duplicate check using the parsed PK from the row
543        if let Some(ref pk) = row.pk {
544            if let Some(ref mut pk_set) = state.pk_values {
545                // Convert SmallVec<[PkValue; 2]> to Vec<Vec<u8>> for our set
546                let pk_tuple: PkTuple = pk
547                    .iter()
548                    .map(|v| match v {
549                        mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
550                        mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
551                        mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
552                        mysql_insert::PkValue::Null => Vec::new(),
553                    })
554                    .collect();
555
556                if !pk_set.insert(pk_tuple.clone()) {
557                    state.pk_duplicates += 1;
558                    let pk_display: String = pk
559                        .iter()
560                        .map(|v| match v {
561                            mysql_insert::PkValue::Int(i) => i.to_string(),
562                            mysql_insert::PkValue::BigInt(i) => i.to_string(),
563                            mysql_insert::PkValue::Text(s) => s.to_string(),
564                            mysql_insert::PkValue::Null => "NULL".to_string(),
565                        })
566                        .collect::<Vec<_>>()
567                        .join(", ");
568
569                    self.add_issue(
570                        ValidationIssue::error(
571                            "DUPLICATE_PK",
572                            format!(
573                                "Duplicate primary key in table '{}': ({})",
574                                table_name, pk_display
575                            ),
576                        )
577                        .with_location(
578                            Location::new()
579                                .with_table(table_name)
580                                .with_statement(stmt_idx),
581                        ),
582                    );
583                }
584            }
585        }
586
587        // FK integrity check using the parsed FK values
588        // First, collect FK check info without holding mutable borrows
589        struct FkCheckInfo {
590            parent_table_id: TableId,
591            fk_tuple: PkTuple,
592            fk_display: String,
593            referenced_table: String,
594        }
595
596        let fk_checks: Vec<FkCheckInfo> = {
597            let schema = match &self.schema {
598                Some(s) => s,
599                None => return,
600            };
601
602            let table_schema = match schema.table(table_id) {
603                Some(t) => t,
604                None => return,
605            };
606
607            row.fk_values
608                .iter()
609                .filter(|(_, fk_values)| !fk_values.iter().all(|v| v.is_null()))
610                .filter_map(|(fk_ref, fk_values)| {
611                    let fk_def = table_schema.foreign_keys.get(fk_ref.fk_index as usize)?;
612                    let parent_table_id = fk_def.referenced_table_id?;
613
614                    let fk_tuple: PkTuple = fk_values
615                        .iter()
616                        .map(|v| match v {
617                            mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
618                            mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
619                            mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
620                            mysql_insert::PkValue::Null => Vec::new(),
621                        })
622                        .collect();
623
624                    let fk_display: String = fk_values
625                        .iter()
626                        .map(|v| match v {
627                            mysql_insert::PkValue::Int(i) => i.to_string(),
628                            mysql_insert::PkValue::BigInt(i) => i.to_string(),
629                            mysql_insert::PkValue::Text(s) => s.to_string(),
630                            mysql_insert::PkValue::Null => "NULL".to_string(),
631                        })
632                        .collect::<Vec<_>>()
633                        .join(", ");
634
635                    Some(FkCheckInfo {
636                        parent_table_id,
637                        fk_tuple,
638                        fk_display,
639                        referenced_table: fk_def.referenced_table.clone(),
640                    })
641                })
642                .collect()
643        };
644
645        // Now check each FK
646        for check in fk_checks {
647            let parent_has_pk = self
648                .table_states
649                .get(&check.parent_table_id)
650                .and_then(|s| s.pk_values.as_ref())
651                .is_some_and(|set| set.contains(&check.fk_tuple));
652
653            if !parent_has_pk {
654                let state = self.table_states.get_mut(&table_id).unwrap();
655                state.fk_missing_parents += 1;
656
657                // Only add issue for first few violations per table
658                if state.fk_missing_parents <= 5 {
659                    self.add_issue(
660                        ValidationIssue::error(
661                            "FK_MISSING_PARENT",
662                            format!(
663                                "FK violation in '{}': ({}) references missing row in '{}'",
664                                table_name, check.fk_display, check.referenced_table
665                            ),
666                        )
667                        .with_location(
668                            Location::new()
669                                .with_table(table_name)
670                                .with_statement(stmt_idx),
671                        ),
672                    );
673                }
674            }
675        }
676    }
677
678    fn build_summary(&self) -> ValidationSummary {
679        let errors = self
680            .issues
681            .iter()
682            .filter(|i| matches!(i.severity, Severity::Error))
683            .count();
684        let warnings = self
685            .issues
686            .iter()
687            .filter(|i| matches!(i.severity, Severity::Warning))
688            .count();
689        let info = self
690            .issues
691            .iter()
692            .filter(|i| matches!(i.severity, Severity::Info))
693            .count();
694
695        let syntax_status = if self.syntax_errors > 0 {
696            CheckStatus::Failed(self.syntax_errors)
697        } else {
698            CheckStatus::Ok
699        };
700
701        let encoding_status = if self.encoding_warnings > 0 {
702            CheckStatus::Failed(self.encoding_warnings)
703        } else {
704            CheckStatus::Ok
705        };
706
707        let ddl_dml_status = if self.ddl_dml_errors > 0 {
708            CheckStatus::Failed(self.ddl_dml_errors)
709        } else {
710            CheckStatus::Ok
711        };
712
713        let pk_status = if self.dialect != SqlDialect::MySql {
714            CheckStatus::Skipped("MySQL only".to_string())
715        } else if !self.options.fk_checks_enabled {
716            CheckStatus::Skipped("--no-fk-checks".to_string())
717        } else if self.pk_errors > 0 {
718            CheckStatus::Failed(self.pk_errors)
719        } else {
720            CheckStatus::Ok
721        };
722
723        let fk_status = if self.dialect != SqlDialect::MySql {
724            CheckStatus::Skipped("MySQL only".to_string())
725        } else if !self.options.fk_checks_enabled {
726            CheckStatus::Skipped("--no-fk-checks".to_string())
727        } else if self.fk_errors > 0 {
728            CheckStatus::Failed(self.fk_errors)
729        } else {
730            CheckStatus::Ok
731        };
732
733        ValidationSummary {
734            dialect: self.dialect.to_string(),
735            issues: self.issues.clone(),
736            summary: SummaryStats {
737                errors,
738                warnings,
739                info,
740                tables_scanned: self.tables_from_ddl.len(),
741                statements_scanned: self.statement_count,
742            },
743            checks: CheckResults {
744                syntax: syntax_status,
745                encoding: encoding_status,
746                ddl_dml_consistency: ddl_dml_status,
747                pk_duplicates: pk_status,
748                fk_integrity: fk_status,
749            },
750        }
751    }
752}