1use crate::parser::{
11 determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
12};
13use crate::progress::ProgressReader;
14use crate::schema::{Schema, SchemaBuilder, TableId};
15use crate::splitter::Compression;
16use ahash::{AHashMap, AHashSet};
17use serde::Serialize;
18use std::fmt;
19use std::fs::File;
20use std::io::Read;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24const MAX_ISSUES: usize = 1000;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "lowercase")]
30pub enum Severity {
31 Error,
32 Warning,
33 Info,
34}
35
36impl fmt::Display for Severity {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 match self {
39 Severity::Error => write!(f, "ERROR"),
40 Severity::Warning => write!(f, "WARNING"),
41 Severity::Info => write!(f, "INFO"),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Serialize)]
48pub struct Location {
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub table: Option<String>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub statement_index: Option<u64>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub approx_line: Option<u64>,
55}
56
57impl Location {
58 pub fn new() -> Self {
59 Self {
60 table: None,
61 statement_index: None,
62 approx_line: None,
63 }
64 }
65
66 pub fn with_table(mut self, table: impl Into<String>) -> Self {
67 self.table = Some(table.into());
68 self
69 }
70
71 pub fn with_statement(mut self, index: u64) -> Self {
72 self.statement_index = Some(index);
73 self
74 }
75
76 #[allow(dead_code)]
77 pub fn with_line(mut self, line: u64) -> Self {
78 self.approx_line = Some(line);
79 self
80 }
81}
82
83impl Default for Location {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89#[derive(Debug, Clone, Serialize)]
91pub struct ValidationIssue {
92 pub code: &'static str,
93 pub severity: Severity,
94 pub message: String,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub location: Option<Location>,
97}
98
99impl ValidationIssue {
100 pub fn error(code: &'static str, message: impl Into<String>) -> Self {
101 Self {
102 code,
103 severity: Severity::Error,
104 message: message.into(),
105 location: None,
106 }
107 }
108
109 pub fn warning(code: &'static str, message: impl Into<String>) -> Self {
110 Self {
111 code,
112 severity: Severity::Warning,
113 message: message.into(),
114 location: None,
115 }
116 }
117
118 pub fn info(code: &'static str, message: impl Into<String>) -> Self {
119 Self {
120 code,
121 severity: Severity::Info,
122 message: message.into(),
123 location: None,
124 }
125 }
126
127 pub fn with_location(mut self, location: Location) -> Self {
128 self.location = Some(location);
129 self
130 }
131}
132
133impl fmt::Display for ValidationIssue {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 write!(f, "{} [{}]", self.severity, self.code)?;
136 if let Some(ref loc) = self.location {
137 if let Some(ref table) = loc.table {
138 write!(f, " table={}", table)?;
139 }
140 if let Some(stmt) = loc.statement_index {
141 write!(f, " stmt={}", stmt)?;
142 }
143 if let Some(line) = loc.approx_line {
144 write!(f, " line~{}", line)?;
145 }
146 }
147 write!(f, ": {}", self.message)
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct ValidateOptions {
154 pub path: PathBuf,
155 pub dialect: Option<SqlDialect>,
156 pub progress: bool,
157 pub strict: bool,
158 pub json: bool,
159 pub max_rows_per_table: usize,
160 pub fk_checks_enabled: bool,
161}
162
163#[derive(Debug, Serialize)]
165pub struct ValidationSummary {
166 pub dialect: String,
167 pub issues: Vec<ValidationIssue>,
168 pub summary: SummaryStats,
169 pub checks: CheckResults,
170}
171
172#[derive(Debug, Serialize)]
173pub struct SummaryStats {
174 pub errors: usize,
175 pub warnings: usize,
176 pub info: usize,
177 pub tables_scanned: usize,
178 pub statements_scanned: u64,
179}
180
181#[derive(Debug, Serialize)]
182pub struct CheckResults {
183 pub syntax: CheckStatus,
184 pub encoding: CheckStatus,
185 pub ddl_dml_consistency: CheckStatus,
186 pub pk_duplicates: CheckStatus,
187 pub fk_integrity: CheckStatus,
188}
189
190#[derive(Debug, Serialize)]
191#[serde(rename_all = "lowercase")]
192pub enum CheckStatus {
193 Ok,
194 Failed(usize),
195 Skipped(String),
196}
197
198impl fmt::Display for CheckStatus {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 match self {
201 CheckStatus::Ok => write!(f, "OK"),
202 CheckStatus::Failed(n) => write!(f, "{} issues", n),
203 CheckStatus::Skipped(reason) => write!(f, "Skipped ({})", reason),
204 }
205 }
206}
207
208impl ValidationSummary {
209 pub fn has_errors(&self) -> bool {
210 self.summary.errors > 0
211 }
212
213 pub fn has_warnings(&self) -> bool {
214 self.summary.warnings > 0
215 }
216}
217
218type PkTuple = Vec<Vec<u8>>;
220
221struct PendingFkCheck {
223 child_table_id: TableId,
224 child_table_name: String,
225 parent_table_id: TableId,
226 parent_table_name: String,
227 fk_tuple: PkTuple,
228 fk_display: String,
229 stmt_idx: u64,
230}
231
232struct TableState {
234 row_count: u64,
235 pk_values: Option<AHashSet<PkTuple>>,
236 pk_column_indices: Vec<usize>,
237 pk_duplicates: u64,
238 fk_missing_parents: u64,
239}
240
241impl TableState {
242 fn new() -> Self {
243 Self {
244 row_count: 0,
245 pk_values: Some(AHashSet::new()),
246 pk_column_indices: Vec::new(),
247 pk_duplicates: 0,
248 fk_missing_parents: 0,
249 }
250 }
251
252 fn with_pk_columns(mut self, indices: Vec<usize>) -> Self {
253 self.pk_column_indices = indices;
254 self
255 }
256}
257
258pub struct Validator {
260 options: ValidateOptions,
261 issues: Vec<ValidationIssue>,
262 dialect: SqlDialect,
263
264 tables_from_ddl: AHashSet<String>,
266 tables_from_dml: Vec<(String, u64)>, schema_builder: SchemaBuilder,
270 schema: Option<Schema>,
271
272 table_states: AHashMap<TableId, TableState>,
274
275 pending_fk_checks: Vec<PendingFkCheck>,
277
278 progress_fn: Option<Arc<dyn Fn(u64) + Send + Sync>>,
280
281 statement_count: u64,
283 syntax_errors: usize,
284 encoding_warnings: usize,
285 ddl_dml_errors: usize,
286 pk_errors: usize,
287 fk_errors: usize,
288}
289
290impl Validator {
291 pub fn new(options: ValidateOptions) -> Self {
292 Self {
293 dialect: options.dialect.unwrap_or(SqlDialect::MySql),
294 options,
295 issues: Vec::new(),
296 tables_from_ddl: AHashSet::new(),
297 tables_from_dml: Vec::new(),
298 schema_builder: SchemaBuilder::new(),
299 schema: None,
300 table_states: AHashMap::new(),
301 pending_fk_checks: Vec::new(),
302 progress_fn: None,
303 statement_count: 0,
304 syntax_errors: 0,
305 encoding_warnings: 0,
306 ddl_dml_errors: 0,
307 pk_errors: 0,
308 fk_errors: 0,
309 }
310 }
311
312 pub fn with_progress<F>(mut self, f: F) -> Self
315 where
316 F: Fn(u64) + Send + Sync + 'static,
317 {
318 self.progress_fn = Some(Arc::new(f));
319 self
320 }
321
322 fn add_issue(&mut self, issue: ValidationIssue) {
323 if self.issues.len() >= MAX_ISSUES {
324 return;
325 }
326
327 match issue.severity {
328 Severity::Error => match issue.code {
329 "SYNTAX" => self.syntax_errors += 1,
330 "DDL_MISSING_TABLE" => self.ddl_dml_errors += 1,
331 "DUPLICATE_PK" => self.pk_errors += 1,
332 "FK_MISSING_PARENT" => self.fk_errors += 1,
333 _ => {}
334 },
335 Severity::Warning => {
336 if issue.code == "ENCODING" {
337 self.encoding_warnings += 1;
338 }
339 }
340 Severity::Info => {}
341 }
342
343 self.issues.push(issue);
344 }
345
346 pub fn validate(mut self) -> anyhow::Result<ValidationSummary> {
347 let file = File::open(&self.options.path)?;
348 let file_size = file.metadata()?.len();
349 let buffer_size = determine_buffer_size(file_size);
350
351 let compression = Compression::from_path(&self.options.path);
353 let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
354 let cb = Arc::clone(cb);
355 let progress_reader = ProgressReader::new(file, move |bytes| {
356 cb(bytes / 2)
358 });
359 compression.wrap_reader(Box::new(progress_reader))
360 } else {
361 compression.wrap_reader(Box::new(file))
362 };
363
364 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
365
366 loop {
368 match parser.read_statement() {
369 Ok(Some(stmt)) => {
370 self.statement_count += 1;
371 self.process_statement(&stmt);
372 }
373 Ok(None) => break,
374 Err(e) => {
375 self.add_issue(
376 ValidationIssue::error("SYNTAX", format!("Parser error: {}", e))
377 .with_location(
378 Location::new().with_statement(self.statement_count + 1),
379 ),
380 );
381 break;
382 }
383 }
384 }
385
386 let missing_table_issues: Vec<_> = self
388 .tables_from_dml
389 .iter()
390 .filter(|(table, _)| {
391 let table_lower = table.to_lowercase();
392 !self
393 .tables_from_ddl
394 .iter()
395 .any(|t| t.to_lowercase() == table_lower)
396 })
397 .map(|(table, stmt_idx)| {
398 ValidationIssue::error(
399 "DDL_MISSING_TABLE",
400 format!(
401 "INSERT/COPY references table '{}' with no CREATE TABLE",
402 table
403 ),
404 )
405 .with_location(Location::new().with_table(table).with_statement(*stmt_idx))
406 })
407 .collect();
408
409 for issue in missing_table_issues {
410 self.add_issue(issue);
411 }
412
413 if self.options.fk_checks_enabled {
415 self.schema = Some(self.schema_builder.build());
416 self.schema_builder = SchemaBuilder::new(); self.initialize_table_states();
418 }
419
420 let schema_not_empty = self.schema.as_ref().is_some_and(|s| !s.is_empty());
422 if self.options.fk_checks_enabled && schema_not_empty {
423 self.run_data_checks()?;
424 self.validate_pending_fk_checks();
426 }
427
428 Ok(self.build_summary())
429 }
430
431 fn process_statement(&mut self, stmt: &[u8]) {
432 if std::str::from_utf8(stmt).is_err() {
434 self.add_issue(
435 ValidationIssue::warning("ENCODING", "Statement contains invalid UTF-8 bytes")
436 .with_location(Location::new().with_statement(self.statement_count)),
437 );
438 }
439
440 let (stmt_type, table_name) =
441 Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.dialect);
442
443 match stmt_type {
444 StatementType::CreateTable => {
445 if !table_name.is_empty() {
446 self.tables_from_ddl.insert(table_name.clone());
447
448 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
450 self.schema_builder.parse_create_table(stmt_str);
451 }
452 }
453 }
454 StatementType::AlterTable => {
455 if let Ok(stmt_str) = std::str::from_utf8(stmt) {
457 self.schema_builder.parse_alter_table(stmt_str);
458 }
459 }
460 StatementType::Insert | StatementType::Copy => {
461 if !table_name.is_empty() {
462 self.tables_from_dml
463 .push((table_name, self.statement_count));
464 }
465 }
466 StatementType::Unknown => {
467 }
469 _ => {}
470 }
471 }
472
473 fn initialize_table_states(&mut self) {
474 let schema = match &self.schema {
475 Some(s) => s,
476 None => return,
477 };
478
479 for table_schema in schema.iter() {
480 let pk_indices: Vec<usize> = table_schema
481 .primary_key
482 .iter()
483 .map(|col_id| col_id.0 as usize)
484 .collect();
485
486 let state = TableState::new().with_pk_columns(pk_indices);
487 self.table_states.insert(table_schema.id, state);
488 }
489 }
490
491 fn run_data_checks(&mut self) -> anyhow::Result<()> {
492 let file = File::open(&self.options.path)?;
493 let file_size = file.metadata()?.len();
494 let buffer_size = determine_buffer_size(file_size);
495
496 let compression = Compression::from_path(&self.options.path);
498 let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
499 let cb = Arc::clone(cb);
500 let progress_reader = ProgressReader::new(file, move |bytes| {
501 cb(file_size / 2 + bytes / 2)
503 });
504 compression.wrap_reader(Box::new(progress_reader))
505 } else {
506 compression.wrap_reader(Box::new(file))
507 };
508
509 let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
510 let mut stmt_count: u64 = 0;
511
512 while let Some(stmt) = parser.read_statement()? {
513 stmt_count += 1;
514
515 let (stmt_type, table_name) =
516 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
517
518 let table_id = match &self.schema {
520 Some(s) => match s.get_table_id(&table_name) {
521 Some(id) => id,
522 None => continue,
523 },
524 None => continue,
525 };
526
527 match stmt_type {
528 StatementType::Insert => {
529 self.check_insert_statement(&stmt, table_id, &table_name, stmt_count);
531 }
532 StatementType::Copy => {
533 self.check_copy_statement(&stmt, table_id, &table_name, stmt_count);
535 }
536 _ => continue,
537 }
538 }
539
540 Ok(())
541 }
542
543 fn check_insert_statement(
545 &mut self,
546 stmt: &[u8],
547 table_id: TableId,
548 table_name: &str,
549 stmt_count: u64,
550 ) {
551 let table_schema = match &self.schema {
552 Some(s) => match s.table(table_id) {
553 Some(ts) => ts,
554 None => return,
555 },
556 None => return,
557 };
558
559 let rows = match mysql_insert::parse_mysql_insert_rows(stmt, table_schema) {
561 Ok(r) => r,
562 Err(_) => return,
563 };
564
565 for row in rows {
566 self.check_mysql_row(table_id, table_name, &row, stmt_count);
567 }
568 }
569
570 fn check_copy_statement(
572 &mut self,
573 stmt: &[u8],
574 table_id: TableId,
575 table_name: &str,
576 stmt_count: u64,
577 ) {
578 let stmt_str = match std::str::from_utf8(stmt) {
580 Ok(s) => s,
581 Err(_) => return,
582 };
583
584 let data_start = if let Some(pos) = stmt_str.find("FROM stdin;") {
586 pos + "FROM stdin;".len()
587 } else if let Some(pos) = stmt_str.find("from stdin;") {
588 pos + "from stdin;".len()
589 } else {
590 return;
591 };
592
593 let data_section = stmt_str[data_start..].trim_start();
595 if data_section.is_empty() {
596 return;
597 }
598
599 let header = &stmt_str[..data_start];
601 let column_order = postgres_copy::parse_copy_columns(header);
602
603 let table_schema = match &self.schema {
605 Some(s) => match s.table(table_id) {
606 Some(ts) => ts,
607 None => return,
608 },
609 None => return,
610 };
611
612 let rows = match postgres_copy::parse_postgres_copy_rows(
614 data_section.as_bytes(),
615 table_schema,
616 column_order,
617 ) {
618 Ok(r) => r,
619 Err(_) => return,
620 };
621
622 for row in rows {
623 self.check_copy_row(table_id, table_name, &row, stmt_count);
624 }
625 }
626
627 fn check_mysql_row(
629 &mut self,
630 table_id: TableId,
631 table_name: &str,
632 row: &mysql_insert::ParsedRow,
633 stmt_idx: u64,
634 ) {
635 self.check_row_common(
636 table_id,
637 table_name,
638 row.pk.as_ref(),
639 &row.fk_values,
640 stmt_idx,
641 );
642 }
643
644 fn check_copy_row(
646 &mut self,
647 table_id: TableId,
648 table_name: &str,
649 row: &postgres_copy::ParsedCopyRow,
650 stmt_idx: u64,
651 ) {
652 self.check_row_common(
653 table_id,
654 table_name,
655 row.pk.as_ref(),
656 &row.fk_values,
657 stmt_idx,
658 );
659 }
660
661 fn check_row_common(
663 &mut self,
664 table_id: TableId,
665 table_name: &str,
666 pk: Option<&smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
667 fk_values: &[(mysql_insert::FkRef, smallvec::SmallVec<[mysql_insert::PkValue; 2]>)],
668 stmt_idx: u64,
669 ) {
670 let max_rows = self.options.max_rows_per_table as u64;
671
672 let state = match self.table_states.get_mut(&table_id) {
673 Some(s) => s,
674 None => return,
675 };
676
677 state.row_count += 1;
678
679 if state.row_count > max_rows {
681 if state.pk_values.is_some() {
682 state.pk_values = None;
683 self.add_issue(
684 ValidationIssue::warning(
685 "PK_CHECK_SKIPPED",
686 format!(
687 "Skipping PK/FK checks for table '{}' after {} rows (increase --max-rows-per-table)",
688 table_name, max_rows
689 ),
690 )
691 .with_location(Location::new().with_table(table_name)),
692 );
693 }
694 return;
695 }
696
697 if let Some(pk_values) = pk {
699 if let Some(ref mut pk_set) = state.pk_values {
700 let pk_tuple: PkTuple = pk_values
702 .iter()
703 .map(|v| match v {
704 mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
705 mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
706 mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
707 mysql_insert::PkValue::Null => Vec::new(),
708 })
709 .collect();
710
711 if !pk_set.insert(pk_tuple.clone()) {
712 state.pk_duplicates += 1;
713 let pk_display: String = pk_values
714 .iter()
715 .map(|v| match v {
716 mysql_insert::PkValue::Int(i) => i.to_string(),
717 mysql_insert::PkValue::BigInt(i) => i.to_string(),
718 mysql_insert::PkValue::Text(s) => s.to_string(),
719 mysql_insert::PkValue::Null => "NULL".to_string(),
720 })
721 .collect::<Vec<_>>()
722 .join(", ");
723
724 self.add_issue(
725 ValidationIssue::error(
726 "DUPLICATE_PK",
727 format!(
728 "Duplicate primary key in table '{}': ({})",
729 table_name, pk_display
730 ),
731 )
732 .with_location(
733 Location::new()
734 .with_table(table_name)
735 .with_statement(stmt_idx),
736 ),
737 );
738 }
739 }
740 }
741
742 let schema = match &self.schema {
744 Some(s) => s,
745 None => return,
746 };
747
748 let table_schema = match schema.table(table_id) {
749 Some(t) => t,
750 None => return,
751 };
752
753 for (fk_ref, fk_vals) in fk_values.iter() {
754 if fk_vals.iter().all(|v| v.is_null()) {
756 continue;
757 }
758
759 let fk_def = match table_schema.foreign_keys.get(fk_ref.fk_index as usize) {
760 Some(fk) => fk,
761 None => continue,
762 };
763
764 let parent_table_id = match fk_def.referenced_table_id {
765 Some(id) => id,
766 None => continue,
767 };
768
769 let fk_tuple: PkTuple = fk_vals
770 .iter()
771 .map(|v| match v {
772 mysql_insert::PkValue::Int(i) => i.to_string().into_bytes(),
773 mysql_insert::PkValue::BigInt(i) => i.to_string().into_bytes(),
774 mysql_insert::PkValue::Text(s) => s.as_bytes().to_vec(),
775 mysql_insert::PkValue::Null => Vec::new(),
776 })
777 .collect();
778
779 let fk_display: String = fk_vals
780 .iter()
781 .map(|v| match v {
782 mysql_insert::PkValue::Int(i) => i.to_string(),
783 mysql_insert::PkValue::BigInt(i) => i.to_string(),
784 mysql_insert::PkValue::Text(s) => s.to_string(),
785 mysql_insert::PkValue::Null => "NULL".to_string(),
786 })
787 .collect::<Vec<_>>()
788 .join(", ");
789
790 self.pending_fk_checks.push(PendingFkCheck {
791 child_table_id: table_id,
792 child_table_name: table_name.to_string(),
793 parent_table_id,
794 parent_table_name: fk_def.referenced_table.clone(),
795 fk_tuple,
796 fk_display,
797 stmt_idx,
798 });
799 }
800 }
801
802 fn validate_pending_fk_checks(&mut self) {
804 for check in std::mem::take(&mut self.pending_fk_checks) {
805 let parent_has_pk = self
806 .table_states
807 .get(&check.parent_table_id)
808 .and_then(|s| s.pk_values.as_ref())
809 .is_some_and(|set| set.contains(&check.fk_tuple));
810
811 if !parent_has_pk {
812 let state = match self.table_states.get_mut(&check.child_table_id) {
813 Some(s) => s,
814 None => continue,
815 };
816 state.fk_missing_parents += 1;
817
818 if state.fk_missing_parents <= 5 {
820 self.add_issue(
821 ValidationIssue::error(
822 "FK_MISSING_PARENT",
823 format!(
824 "FK violation in '{}': ({}) references missing row in '{}'",
825 check.child_table_name, check.fk_display, check.parent_table_name
826 ),
827 )
828 .with_location(
829 Location::new()
830 .with_table(&check.child_table_name)
831 .with_statement(check.stmt_idx),
832 ),
833 );
834 }
835 }
836 }
837 }
838
839 fn build_summary(&self) -> ValidationSummary {
840 let errors = self
841 .issues
842 .iter()
843 .filter(|i| matches!(i.severity, Severity::Error))
844 .count();
845 let warnings = self
846 .issues
847 .iter()
848 .filter(|i| matches!(i.severity, Severity::Warning))
849 .count();
850 let info = self
851 .issues
852 .iter()
853 .filter(|i| matches!(i.severity, Severity::Info))
854 .count();
855
856 let syntax_status = if self.syntax_errors > 0 {
857 CheckStatus::Failed(self.syntax_errors)
858 } else {
859 CheckStatus::Ok
860 };
861
862 let encoding_status = if self.encoding_warnings > 0 {
863 CheckStatus::Failed(self.encoding_warnings)
864 } else {
865 CheckStatus::Ok
866 };
867
868 let ddl_dml_status = if self.ddl_dml_errors > 0 {
869 CheckStatus::Failed(self.ddl_dml_errors)
870 } else {
871 CheckStatus::Ok
872 };
873
874 let pk_status = if !self.options.fk_checks_enabled {
875 CheckStatus::Skipped("--no-fk-checks".to_string())
876 } else if self.pk_errors > 0 {
877 CheckStatus::Failed(self.pk_errors)
878 } else {
879 CheckStatus::Ok
880 };
881
882 let fk_status = if !self.options.fk_checks_enabled {
883 CheckStatus::Skipped("--no-fk-checks".to_string())
884 } else if self.fk_errors > 0 {
885 CheckStatus::Failed(self.fk_errors)
886 } else {
887 CheckStatus::Ok
888 };
889
890 ValidationSummary {
891 dialect: self.dialect.to_string(),
892 issues: self.issues.clone(),
893 summary: SummaryStats {
894 errors,
895 warnings,
896 info,
897 tables_scanned: self.tables_from_ddl.len(),
898 statements_scanned: self.statement_count,
899 },
900 checks: CheckResults {
901 syntax: syntax_status,
902 encoding: encoding_status,
903 ddl_dml_consistency: ddl_dml_status,
904 pk_duplicates: pk_status,
905 fk_integrity: fk_status,
906 },
907 }
908 }
909}