1use crate::parser::{determine_buffer_size, mysql_insert, Parser, SqlDialect, StatementType};
11use crate::schema::{Schema, SchemaBuilder, TableId};
12use crate::splitter::Compression;
13use ahash::{AHashMap, AHashSet};
14use serde::Serialize;
15use std::fmt;
16use std::fs::File;
17use std::io::Read;
18use std::path::PathBuf;
19
20const MAX_ISSUES: usize = 1000;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "lowercase")]
26pub enum Severity {
27 Error,
28 Warning,
29 Info,
30}
31
32impl fmt::Display for Severity {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 match self {
35 Severity::Error => write!(f, "ERROR"),
36 Severity::Warning => write!(f, "WARNING"),
37 Severity::Info => write!(f, "INFO"),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct Location {
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub table: Option<String>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub statement_index: Option<u64>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub approx_line: Option<u64>,
51}
52
53impl Location {
54 pub fn new() -> Self {
55 Self {
56 table: None,
57 statement_index: None,
58 approx_line: None,
59 }
60 }
61
62 pub fn with_table(mut self, table: impl Into<String>) -> Self {
63 self.table = Some(table.into());
64 self
65 }
66
67 pub fn with_statement(mut self, index: u64) -> Self {
68 self.statement_index = Some(index);
69 self
70 }
71
72 #[allow(dead_code)]
73 pub fn with_line(mut self, line: u64) -> Self {
74 self.approx_line = Some(line);
75 self
76 }
77}
78
79impl Default for Location {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85#[derive(Debug, Clone, Serialize)]
87pub struct ValidationIssue {
88 pub code: &'static str,
89 pub severity: Severity,
90 pub message: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub location: Option<Location>,
93}
94
95impl ValidationIssue {
96 pub fn error(code: &'static str, message: impl Into<String>) -> Self {
97 Self {
98 code,
99 severity: Severity::Error,
100 message: message.into(),
101 location: None,
102 }
103 }
104
105 pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
106 Self {
107 code,
108 severity: Severity::Warning,
109 message: message.into(),
110 location: None,
111 }
112 }
113
114 pub fn info(code: &'static str, message: impl Into<String>) -> Self {
115 Self {
116 code,
117 severity: Severity::Info,
118 message: message.into(),
119 location: None,
120 }
121 }
122
123 pub fn with_location(mut self, location: Location) -> Self {
124 self.location = Some(location);
125 self
126 }
127}
128
129impl fmt::Display for ValidationIssue {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 write!(f, "{} [{}]", self.severity, self.code)?;
132 if let Some(ref loc) = self.location {
133 if let Some(ref table) = loc.table {
134 write!(f, " table={}", table)?;
135 }
136 if let Some(stmt) = loc.statement_index {
137 write!(f, " stmt={}", stmt)?;
138 }
139 if let Some(line) = loc.approx_line {
140 write!(f, " line~{}", line)?;
141 }
142 }
143 write!(f, ": {}", self.message)
144 }
145}
146
147#[derive(Debug, Clone)]
149pub struct ValidateOptions {
150 pub path: PathBuf,
151 pub dialect: Option<SqlDialect>,
152 pub progress: bool,
153 pub strict: bool,
154 pub json: bool,
155 pub max_rows_per_table: usize,
156 pub fk_checks_enabled: bool,
157}
158
159#[derive(Debug, Serialize)]
161pub struct ValidationSummary {
162 pub dialect: String,
163 pub issues: Vec<ValidationIssue>,
164 pub summary: SummaryStats,
165 pub checks: CheckResults,
166}
167
168#[derive(Debug, Serialize)]
169pub struct SummaryStats {
170 pub errors: usize,
171 pub warnings: usize,
172 pub info: usize,
173 pub tables_scanned: usize,
174 pub statements_scanned: u64,
175}
176
177#[derive(Debug, Serialize)]
178pub struct CheckResults {
179 pub syntax: CheckStatus,
180 pub encoding: CheckStatus,
181 pub ddl_dml_consistency: CheckStatus,
182 pub pk_duplicates: CheckStatus,
183 pub fk_integrity: CheckStatus,
184}
185
186#[derive(Debug, Serialize)]
187#[serde(rename_all = "lowercase")]
188pub enum CheckStatus {
189 Ok,
190 Failed(usize),
191 Skipped(String),
192}
193
194impl fmt::Display for CheckStatus {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 match self {
197 CheckStatus::Ok => write!(f, "OK"),
198 CheckStatus::Failed(n) => write!(f, "{} issues", n),
199 CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
200 }
201 }
202}
203
204impl ValidationSummary {
205 pub fn has_errors(&self) -> bool {
206 self.summary.errors > 0
207 }
208
209 pub fn has_warnings(&self) -> bool {
210 self.summary.warnings > 0
211 }
212}
213
214type PkTuple = Vec<Vec<u8>>;
216
217struct TableState {
219 row_count: u64,
220 pk_values: Option<AHashSet<PkTuple>>,
221 pk_column_indices: Vec<usize>,
222 pk_duplicates: u64,
223 fk_missing_parents: u64,
224}
225
226impl TableState {
227 fn new() -> Self {
228 Self {
229 row_count: 0,
230 pk_values: Some(AHashSet::new()),
231 pk_column_indices: Vec::new(),
232 pk_duplicates: 0,
233 fk_missing_parents: 0,
234 }
235 }
236
237 fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
238 self.pk_column_indices = indices;
239 self
240 }
241}
242
243pub struct Validator {
245 options: ValidateOptions,
246 issues: Vec<ValidationIssue>,
247 dialect: SqlDialect,
248
249 tables_from_ddl: AHashSet<String>,
251 tables_from_dml: Vec<(String, u64)>, schema_builder: SchemaBuilder,
255 schema: Option<Schema>,
256
257 table_states: AHashMap<TableId, TableState>,
259
260 statement_count: u64,
262 syntax_errors: usize,
263 encoding_warnings: usize,
264 ddl_dml_errors: usize,
265 pk_errors: usize,
266 fk_errors: usize,
267}
268
269impl Validator {
270 pub fn new(options: ValidateOptions) -> Self {
271 Self {
272 dialect: options.dialect.unwrap_or(SqlDialect::MySql),
273 options,
274 issues: Vec::new(),
275 tables_from_ddl: AHashSet::new(),
276 tables_from_dml: Vec::new(),
277 schema_builder: SchemaBuilder::new(),
278 schema: None,
279 table_states: AHashMap::new(),
280 statement_count: 0,
281 syntax_errors: 0,
282 encoding_warnings: 0,
283 ddl_dml_errors: 0,
284 pk_errors: 0,
285 fk_errors: 0,
286 }
287 }
288
289 fn add_issue(&mut self, issue: ValidationIssue) {
290 if self.issues.len() >= MAX_ISSUES {
291 return;
292 }
293
294 match issue.severity {
295 Severity::Error => match issue.code {
296 "SYNTAX" => self.syntax_errors += 1,
297 "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
298 "DUPLICATE_PK" => self.pk_errors += 1,
299 "FK_MISSING_PARENT" => self.fk_errors += 1,
300 _ => {}
301 },
302 Severity::Warning => {
303 if issue.code == "ENCODING" {
304 self.encoding_warnings += 1;
305 }
306 }
307 Severity::Info => {}
308 }
309
310 self.issues.push(issue);
311 }
312
313 pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
314 let file = File::open(&self.options.path)?;
315 let file_size = file.metadata()?.len();
316 let buffer_size = determine_buffer_size(file_size);
317
318 let compression = Compression::from_path(&self.options.path);
319 let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file));
320
321 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
322
323 loop {
325 match parser.read_statement() {
326 Ok(Some(stmt)) => {
327 self.statement_count += 1;
328 self.process_statement(&stmt);
329 }
330 Ok(None) => break,
331 Err(e) => {
332 self.add_issue(
333 ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
334 .with_location(
335 Location::new().with_statement(self.statement_count + 1),
336 ),
337 );
338 break;
339 }
340 }
341 }
342
343 let missing_table_issues: Vec<_> = self
345 .tables_from_dml
346 .iter()
347 .filter(|(table, _)| {
348 let table_lower = table.to_lowercase();
349 !self
350 .tables_from_ddl
351 .iter()
352 .any(|t| t.to_lowercase() == table_lower)
353 })
354 .map(|(table, stmt_idx)| {
355 ValidationIssue::error(
356 "DDL_MISSING_TABLE",
357 format!(
358 "INSERT/COPY references table '{}' with no CREATE TABLE",
359 table
360 ),
361 )
362 .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
363 })
364 .collect();
365
366 for issue in missing_table_issues {
367 self.add_issue(issue);
368 }
369
370 if self.dialect == SqlDialect::MySql && self.options.fk_checks_enabled {
372 self.schema = Some(self.schema_builder.build());
373 self.schema_builder = SchemaBuilder::new(); self.initialize_table_states();
375 }
376
377 let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
379 if self.dialect == SqlDialect::MySql && self.options.fk_checks_enabled && schema_not_empty {
380 self.run_data_checks()?;
381 } else if self.dialect != SqlDialect::MySql && self.options.fk_checks_enabled {
382 self.add_issue(ValidationIssue::info(
383 "FK_CHECK_UNSUPPORTED",
384 format!(
385 "PK/FK data integrity checks are only supported for MySQL dumps; skipping for {}",
386 self.dialect
387 ),
388 ));
389 }
390
391 Ok(self.build_summary())
392 }
393
394 fn process_statement(&mut self, stmt: &[u8]) {
395 if std::str::from_utf8(stmt).is_err() {
397 self.add_issue(
398 ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
399 .with_location(Location::new().with_statement(self.statement_count)),
400 );
401 }
402
403 let (stmt_type, table_name) =
404 Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
405
406 match stmt_type {
407 StatementType::CreateTable => {
408 if !table_name.is_empty() {
409 self.tables_from_ddl.insert(table_name.clone());
410
411 if self.dialect == SqlDialect::MySql {
413 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
414 self.schema_builder.parse_create_table(stmt_str);
415 }
416 }
417 }
418 }
419 StatementType::AlterTable => {
420 if self.dialect == SqlDialect::MySql {
422 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
423 self.schema_builder.parse_alter_table(stmt_str);
424 }
425 }
426 }
427 StatementType::Insert | StatementType::Copy => {
428 if !table_name.is_empty() {
429 self.tables_from_dml
430 .push((table_name, self.statement_count));
431 }
432 }
433 StatementType::Unknown => {
434 }
436 _ => {}
437 }
438 }
439
440 fn initialize_table_states(&mut self) {
441 let schema = match &self.schema {
442 Some(s) => s,
443 None => return,
444 };
445
446 for table_schema in schema.iter() {
447 let pk_indices: Vec<usize> = table_schema
448 .primary_key
449 .iter()
450 .map(|col_id| col_id.0 as usize)
451 .collect();
452
453 let state = TableState::new().with_pk_columns(pk_indices);
454 self.table_states.insert(table_schema.id, state);
455 }
456 }
457
458 fn run_data_checks(&mut self) -> anyhow::Result<()> {
459 let file = File::open(&self.options.path)?;
460 let file_size = file.metadata()?.len();
461 let buffer_size = determine_buffer_size(file_size);
462
463 let compression = Compression::from_path(&self.options.path);
464 let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file));
465
466 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
467 let mut stmt_count: u64 = 0;
468
469 while let Some(stmt) = parser.read_statement()? {
470 stmt_count += 1;
471
472 let (stmt_type, table_name) =
473 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
474
475 if stmt_type != StatementType::Insert {
476 continue;
477 }
478
479 let schema = match &self.schema {
480 Some(s) => s,
481 None => continue,
482 };
483
484 let table_id = match schema.get_table_id(&table_name) {
485 Some(id) => id,
486 None => continue,
487 };
488
489 let table_schema = match schema.table(table_id) {
490 Some(s) => s,
491 None => continue,
492 };
493
494 let rows = match mysql_insert::parse_mysql_insert_rows(&stmt, table_schema) {
496 Ok(r) => r,
497 Err(_) => continue,
498 };
499
500 for row in rows {
501 self.check_row(table_id, &table_name, &row, stmt_count);
502 }
503 }
504
505 Ok(())
506 }
507
508 fn check_row(
509 &mut self,
510 table_id: TableId,
511 table_name: &str,
512 row: &mysql_insert::ParsedRow,
513 stmt_idx: u64,
514 ) {
515 let max_rows = self.options.max_rows_per_table as u64;
516
517 let state = match self.table_states.get_mut(&table_id) {
518 Some(s) => s,
519 None => return,
520 };
521
522 state.row_count += 1;
523
524 if state.row_count > max_rows {
526 if state.pk_values.is_some() {
527 state.pk_values = None;
528 self.add_issue(
529 ValidationIssue::warning(
530 "PK_CHECK_SKIPPED",
531 format!(
532 "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
533 table_name, max_rows
534 ),
535 )
536 .with_location(Location::new().with_table(table_name)),
537 );
538 }
539 return;
540 }
541
542 if let Some(ref pk) = row.pk {
544 if let Some(ref mut pk_set) = state.pk_values {
545 let pk_tuple: PkTuple = pk
547 .iter()
548 .map(|v| match v {
549 mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
550 mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
551 mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
552 mysql_insert::PkValue::Null => Vec::new(),
553 })
554 .collect();
555
556 if !pk_set.insert(pk_tuple.clone()) {
557 state.pk_duplicates += 1;
558 let pk_display: String = pk
559 .iter()
560 .map(|v| match v {
561 mysql_insert::PkValue::Int(i) => i.to_string(),
562 mysql_insert::PkValue::BigInt(i) => i.to_string(),
563 mysql_insert::PkValue::Text(s) => s.to_string(),
564 mysql_insert::PkValue::Null => "NULL".to_string(),
565 })
566 .collect::<Vec<_>>()
567 .join(", ");
568
569 self.add_issue(
570 ValidationIssue::error(
571 "DUPLICATE_PK",
572 format!(
573 "Duplicate primary key in table '{}': ({})",
574 table_name, pk_display
575 ),
576 )
577 .with_location(
578 Location::new()
579 .with_table(table_name)
580 .with_statement(stmt_idx),
581 ),
582 );
583 }
584 }
585 }
586
587 struct FkCheckInfo {
590 parent_table_id: TableId,
591 fk_tuple: PkTuple,
592 fk_display: String,
593 referenced_table: String,
594 }
595
596 let fk_checks: Vec<FkCheckInfo> = {
597 let schema = match &self.schema {
598 Some(s) => s,
599 None => return,
600 };
601
602 let table_schema = match schema.table(table_id) {
603 Some(t) => t,
604 None => return,
605 };
606
607 row.fk_values
608 .iter()
609 .filter(|(_, fk_values)| !fk_values.iter().all(|v| v.is_null()))
610 .filter_map(|(fk_ref, fk_values)| {
611 let fk_def = table_schema.foreign_keys.get(fk_ref.fk_index as usize)?;
612 let parent_table_id = fk_def.referenced_table_id?;
613
614 let fk_tuple: PkTuple = fk_values
615 .iter()
616 .map(|v| match v {
617 mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
618 mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
619 mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
620 mysql_insert::PkValue::Null => Vec::new(),
621 })
622 .collect();
623
624 let fk_display: String = fk_values
625 .iter()
626 .map(|v| match v {
627 mysql_insert::PkValue::Int(i) => i.to_string(),
628 mysql_insert::PkValue::BigInt(i) => i.to_string(),
629 mysql_insert::PkValue::Text(s) => s.to_string(),
630 mysql_insert::PkValue::Null => "NULL".to_string(),
631 })
632 .collect::<Vec<_>>()
633 .join(", ");
634
635 Some(FkCheckInfo {
636 parent_table_id,
637 fk_tuple,
638 fk_display,
639 referenced_table: fk_def.referenced_table.clone(),
640 })
641 })
642 .collect()
643 };
644
645 for check in fk_checks {
647 let parent_has_pk = self
648 .table_states
649 .get(&check.parent_table_id)
650 .and_then(|s| s.pk_values.as_ref())
651 .is_some_and(|set| set.contains(&check.fk_tuple));
652
653 if !parent_has_pk {
654 let state = self.table_states.get_mut(&table_id).unwrap();
655 state.fk_missing_parents += 1;
656
657 if state.fk_missing_parents <= 5 {
659 self.add_issue(
660 ValidationIssue::error(
661 "FK_MISSING_PARENT",
662 format!(
663 "FK violation in '{}': ({}) references missing row in '{}'",
664 table_name, check.fk_display, check.referenced_table
665 ),
666 )
667 .with_location(
668 Location::new()
669 .with_table(table_name)
670 .with_statement(stmt_idx),
671 ),
672 );
673 }
674 }
675 }
676 }
677
678 fn build_summary(&self) -> ValidationSummary {
679 let errors = self
680 .issues
681 .iter()
682 .filter(|i| matches!(i.severity, Severity::Error))
683 .count();
684 let warnings = self
685 .issues
686 .iter()
687 .filter(|i| matches!(i.severity, Severity::Warning))
688 .count();
689 let info = self
690 .issues
691 .iter()
692 .filter(|i| matches!(i.severity, Severity::Info))
693 .count();
694
695 let syntax_status = if self.syntax_errors > 0 {
696 CheckStatus::Failed(self.syntax_errors)
697 } else {
698 CheckStatus::Ok
699 };
700
701 let encoding_status = if self.encoding_warnings > 0 {
702 CheckStatus::Failed(self.encoding_warnings)
703 } else {
704 CheckStatus::Ok
705 };
706
707 let ddl_dml_status = if self.ddl_dml_errors > 0 {
708 CheckStatus::Failed(self.ddl_dml_errors)
709 } else {
710 CheckStatus::Ok
711 };
712
713 let pk_status = if self.dialect != SqlDialect::MySql {
714 CheckStatus::Skipped("MySQL only".to_string())
715 } else if !self.options.fk_checks_enabled {
716 CheckStatus::Skipped("--no-fk-checks".to_string())
717 } else if self.pk_errors > 0 {
718 CheckStatus::Failed(self.pk_errors)
719 } else {
720 CheckStatus::Ok
721 };
722
723 let fk_status = if self.dialect != SqlDialect::MySql {
724 CheckStatus::Skipped("MySQL only".to_string())
725 } else if !self.options.fk_checks_enabled {
726 CheckStatus::Skipped("--no-fk-checks".to_string())
727 } else if self.fk_errors > 0 {
728 CheckStatus::Failed(self.fk_errors)
729 } else {
730 CheckStatus::Ok
731 };
732
733 ValidationSummary {
734 dialect: self.dialect.to_string(),
735 issues: self.issues.clone(),
736 summary: SummaryStats {
737 errors,
738 warnings,
739 info,
740 tables_scanned: self.tables_from_ddl.len(),
741 statements_scanned: self.statement_count,
742 },
743 checks: CheckResults {
744 syntax: syntax_status,
745 encoding: encoding_status,
746 ddl_dml_consistency: ddl_dml_status,
747 pk_duplicates: pk_status,
748 fk_integrity: fk_status,
749 },
750 }
751 }
752}