1use super::{parse_ignore_patterns, should_ignore_column, should_include_table, DiffWarning};
7use crate::parser::{
8 determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
9};
10use crate::pk::{hash_pk_values, PkHash};
11use crate::progress::ProgressReader;
12use crate::schema::Schema;
13use crate::splitter::Compression;
14use ahash::AHashMap;
15use glob::Pattern;
16use serde::Serialize;
17use std::collections::HashMap;
18use std::fs::File;
19use std::hash::{Hash, Hasher};
20use std::io::Read;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24#[derive(Debug, Clone)]
26pub struct DataDiffOptions {
27 pub max_pk_entries_global: usize,
29 pub max_pk_entries_per_table: usize,
31 pub sample_size: usize,
33 pub tables: Vec<String>,
35 pub exclude: Vec<String>,
37 pub allow_no_pk: bool,
39 pub pk_overrides: std::collections::HashMap<String, Vec<String>>,
41 pub ignore_columns: Vec<String>,
43}
44
45impl Default for DataDiffOptions {
46 fn default() -> Self {
47 Self {
48 max_pk_entries_global: 10_000_000,
49 max_pk_entries_per_table: 5_000_000,
50 sample_size: 0,
51 tables: Vec::new(),
52 exclude: Vec::new(),
53 allow_no_pk: false,
54 pk_overrides: std::collections::HashMap::new(),
55 ignore_columns: Vec::new(),
56 }
57 }
58}
59
60#[derive(Debug, Serialize)]
62pub struct DataDiff {
63 pub tables: HashMap<String, TableDataDiff>,
65}
66
67#[derive(Debug, Serialize, Clone, Default)]
69pub struct TableDataDiff {
70 pub old_row_count: u64,
72 pub new_row_count: u64,
74 pub added_count: u64,
76 pub removed_count: u64,
78 pub modified_count: u64,
80 pub truncated: bool,
82 #[serde(skip_serializing_if = "Vec::is_empty")]
84 pub sample_added_pks: Vec<String>,
85 #[serde(skip_serializing_if = "Vec::is_empty")]
87 pub sample_removed_pks: Vec<String>,
88 #[serde(skip_serializing_if = "Vec::is_empty")]
90 pub sample_modified_pks: Vec<String>,
91}
92
93struct TableState {
95 row_count: u64,
97 pk_to_digest: Option<AHashMap<PkHash, u64>>,
100 pk_strings: Option<AHashMap<PkHash, String>>,
102 truncated: bool,
104}
105
106impl TableState {
107 fn new() -> Self {
108 Self {
109 row_count: 0,
110 pk_to_digest: Some(AHashMap::new()),
111 pk_strings: None,
112 truncated: false,
113 }
114 }
115
116 fn new_with_pk_strings() -> Self {
117 Self {
118 row_count: 0,
119 pk_to_digest: Some(AHashMap::new()),
120 pk_strings: Some(AHashMap::new()),
121 truncated: false,
122 }
123 }
124}
125
126fn hash_row_digest(values: &[mysql_insert::PkValue]) -> u64 {
128 let mut hasher = ahash::AHasher::default();
129 for v in values {
130 match v {
131 mysql_insert::PkValue::Int(i) => {
132 0u8.hash(&mut hasher);
133 i.hash(&mut hasher);
134 }
135 mysql_insert::PkValue::BigInt(i) => {
136 1u8.hash(&mut hasher);
137 i.hash(&mut hasher);
138 }
139 mysql_insert::PkValue::Text(s) => {
140 2u8.hash(&mut hasher);
141 s.hash(&mut hasher);
142 }
143 mysql_insert::PkValue::Null => {
144 3u8.hash(&mut hasher);
145 }
146 }
147 }
148 hasher.finish()
149}
150
151fn format_single_pk(v: &mysql_insert::PkValue) -> String {
153 match v {
154 mysql_insert::PkValue::Int(i) => i.to_string(),
155 mysql_insert::PkValue::BigInt(i) => i.to_string(),
156 mysql_insert::PkValue::Text(s) => s.to_string(),
157 mysql_insert::PkValue::Null => "NULL".to_string(),
158 }
159}
160
161fn format_pk_value(pk: &[mysql_insert::PkValue]) -> String {
163 if pk.len() == 1 {
164 format_single_pk(&pk[0])
165 } else {
166 let parts: Vec<String> = pk.iter().map(format_single_pk).collect();
167 format!("({})", parts.join(", "))
168 }
169}
170
171fn hash_row_digest_with_ignore(values: &[mysql_insert::PkValue], ignore_indices: &[usize]) -> u64 {
173 let mut hasher = ahash::AHasher::default();
174 for (i, v) in values.iter().enumerate() {
175 if ignore_indices.contains(&i) {
176 continue;
177 }
178 match v {
179 mysql_insert::PkValue::Int(val) => {
180 0u8.hash(&mut hasher);
181 val.hash(&mut hasher);
182 }
183 mysql_insert::PkValue::BigInt(val) => {
184 1u8.hash(&mut hasher);
185 val.hash(&mut hasher);
186 }
187 mysql_insert::PkValue::Text(s) => {
188 2u8.hash(&mut hasher);
189 s.hash(&mut hasher);
190 }
191 mysql_insert::PkValue::Null => {
192 3u8.hash(&mut hasher);
193 }
194 }
195 }
196 hasher.finish()
197}
198
199pub struct DataDiffer {
201 options: DataDiffOptions,
202 old_state: AHashMap<String, TableState>,
204 new_state: AHashMap<String, TableState>,
206 total_pk_entries: usize,
208 global_truncated: bool,
210 current_copy_context: Option<(String, Vec<String>)>,
212 warnings: Vec<DiffWarning>,
214 warned_tables: AHashMap<String, ()>,
216 ignore_patterns: Vec<Pattern>,
218 ignore_indices_cache: AHashMap<String, Vec<usize>>,
220}
221
222impl DataDiffer {
223 pub fn new(options: DataDiffOptions) -> Self {
225 let ignore_patterns = parse_ignore_patterns(&options.ignore_columns);
226 Self {
227 options,
228 old_state: AHashMap::new(),
229 new_state: AHashMap::new(),
230 total_pk_entries: 0,
231 global_truncated: false,
232 current_copy_context: None,
233 warnings: Vec::new(),
234 warned_tables: AHashMap::new(),
235 ignore_patterns,
236 ignore_indices_cache: AHashMap::new(),
237 }
238 }
239
240 fn get_ignore_indices(
242 &mut self,
243 table_name: &str,
244 table_schema: &crate::schema::TableSchema,
245 ) -> Vec<usize> {
246 let table_lower = table_name.to_lowercase();
247 if let Some(indices) = self.ignore_indices_cache.get(&table_lower) {
248 return indices.clone();
249 }
250
251 let pk_indices: Vec<usize> = table_schema
253 .primary_key
254 .iter()
255 .map(|col_id| col_id.0 as usize)
256 .collect();
257
258 let mut indices: Vec<usize> = Vec::new();
259 for (i, col) in table_schema.columns.iter().enumerate() {
260 if should_ignore_column(table_name, &col.name, &self.ignore_patterns) {
261 if pk_indices.contains(&i) && !self.warned_tables.contains_key(&table_lower) {
263 self.warnings.push(DiffWarning {
264 table: Some(table_name.to_string()),
265 message: format!(
266 "Ignoring primary key column '{}' may affect diff accuracy",
267 col.name
268 ),
269 });
270 }
271 indices.push(i);
272 }
273 }
274
275 self.ignore_indices_cache
276 .insert(table_lower, indices.clone());
277 indices
278 }
279
280 fn get_effective_pk_indices(
283 &self,
284 table_name: &str,
285 table_schema: &crate::schema::TableSchema,
286 ) -> (Vec<usize>, bool, Vec<String>) {
287 if let Some(override_cols) = self.options.pk_overrides.get(&table_name.to_lowercase()) {
288 let mut indices: Vec<usize> = Vec::new();
289 let mut invalid_cols: Vec<String> = Vec::new();
290
291 for col_name in override_cols {
292 if let Some(idx) = table_schema
293 .columns
294 .iter()
295 .position(|c| c.name.eq_ignore_ascii_case(col_name))
296 {
297 indices.push(idx);
298 } else {
299 invalid_cols.push(col_name.clone());
300 }
301 }
302
303 (indices, true, invalid_cols)
304 } else {
305 let indices: Vec<usize> = table_schema
306 .primary_key
307 .iter()
308 .map(|col_id| col_id.0 as usize)
309 .collect();
310 (indices, false, Vec::new())
311 }
312 }
313
314 fn extract_pk_from_values(
316 &self,
317 all_values: &[mysql_insert::PkValue],
318 pk_indices: &[usize],
319 ) -> Option<smallvec::SmallVec<[mysql_insert::PkValue; 2]>> {
320 if pk_indices.is_empty() {
321 return None;
322 }
323 let mut pk_values: smallvec::SmallVec<[mysql_insert::PkValue; 2]> =
324 smallvec::SmallVec::new();
325 for &idx in pk_indices {
326 if let Some(val) = all_values.get(idx) {
327 if val.is_null() {
328 return None;
329 }
330 pk_values.push(val.clone());
331 } else {
332 return None;
333 }
334 }
335 if pk_values.is_empty() {
336 None
337 } else {
338 Some(pk_values)
339 }
340 }
341
342 #[allow(clippy::too_many_arguments)]
344 pub fn scan_file(
345 &mut self,
346 path: &PathBuf,
347 schema: &Schema,
348 dialect: SqlDialect,
349 is_old: bool,
350 progress_fn: &Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
351 byte_offset: u64,
352 total_bytes: u64,
353 ) -> anyhow::Result<()> {
354 let file = File::open(path)?;
355 let file_size = file.metadata()?.len();
356 let buffer_size = determine_buffer_size(file_size);
357 let compression = Compression::from_path(path);
358
359 let reader: Box<dyn Read> = if let Some(ref cb) = progress_fn {
360 let cb = Arc::clone(cb);
361 let progress_reader = ProgressReader::new(file, move |bytes| {
362 cb(byte_offset + bytes, total_bytes);
363 });
364 compression.wrap_reader(Box::new(progress_reader))?
365 } else {
366 compression.wrap_reader(Box::new(file))?
367 };
368
369 let mut parser = Parser::with_dialect(reader, buffer_size, dialect);
370
371 self.current_copy_context = None;
373
374 while let Some(stmt) = parser.read_statement()? {
375 let (stmt_type, table_name) =
376 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
377
378 if dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
380 if stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n") {
382 if let Some((ref copy_table, ref column_order)) =
383 self.current_copy_context.clone()
384 {
385 if should_include_table(
387 copy_table,
388 &self.options.tables,
389 &self.options.exclude,
390 ) {
391 if let Some(table_schema) = schema.get_table(copy_table) {
392 let has_pk = !table_schema.primary_key.is_empty();
393 let has_pk_override = self
394 .options
395 .pk_overrides
396 .contains_key(©_table.to_lowercase());
397 if has_pk || self.options.allow_no_pk || has_pk_override {
398 self.process_copy_data(
399 &stmt,
400 copy_table,
401 table_schema,
402 column_order.clone(),
403 is_old,
404 )?;
405 } else if !self.warned_tables.contains_key(copy_table) {
406 self.warned_tables.insert(copy_table.clone(), ());
407 self.warnings.push(DiffWarning {
408 table: Some(copy_table.clone()),
409 message: "No primary key, data comparison skipped"
410 .to_string(),
411 });
412 }
413 }
414 }
415 }
416 }
417 self.current_copy_context = None;
418 continue;
419 }
420
421 if table_name.is_empty() {
422 continue;
423 }
424
425 if !should_include_table(&table_name, &self.options.tables, &self.options.exclude) {
427 continue;
428 }
429
430 let table_schema = match schema.get_table(&table_name) {
432 Some(t) => t,
433 None => continue,
434 };
435
436 let has_pk_override = self
438 .options
439 .pk_overrides
440 .contains_key(&table_name.to_lowercase());
441 if table_schema.primary_key.is_empty() && !self.options.allow_no_pk && !has_pk_override
442 {
443 if !self.warned_tables.contains_key(&table_name) {
445 self.warned_tables.insert(table_name.clone(), ());
446 self.warnings.push(DiffWarning {
447 table: Some(table_name.clone()),
448 message: "No primary key, data comparison skipped".to_string(),
449 });
450 }
451 continue;
452 }
454
455 match stmt_type {
456 StatementType::Insert => {
457 self.process_insert_statement(&stmt, &table_name, table_schema, is_old)?;
458 }
459 StatementType::Copy => {
460 let header = String::from_utf8_lossy(&stmt);
463 let column_order = postgres_copy::parse_copy_columns(&header);
464 self.current_copy_context = Some((table_name.clone(), column_order));
465 }
466 _ => {}
467 }
468 }
469
470 Ok(())
471 }
472
473 fn process_insert_statement(
475 &mut self,
476 stmt: &[u8],
477 table_name: &str,
478 table_schema: &crate::schema::TableSchema,
479 is_old: bool,
480 ) -> anyhow::Result<()> {
481 let rows = mysql_insert::parse_mysql_insert_rows(stmt, table_schema)?;
482
483 let (pk_indices, has_override, invalid_cols) =
484 self.get_effective_pk_indices(table_name, table_schema);
485
486 let ignore_indices = self.get_ignore_indices(table_name, table_schema);
488
489 if !invalid_cols.is_empty() && !self.warned_tables.contains_key(table_name) {
491 self.warned_tables.insert(table_name.to_string(), ());
492 self.warnings.push(DiffWarning {
493 table: Some(table_name.to_string()),
494 message: format!(
495 "Primary key override column(s) not found: {}",
496 invalid_cols.join(", ")
497 ),
498 });
499 }
500
501 for row in rows {
502 let effective_pk = if has_override {
503 self.extract_pk_from_values(&row.all_values, &pk_indices)
504 } else {
505 row.pk
506 };
507 self.record_row(
508 table_name,
509 &effective_pk,
510 &row.all_values,
511 is_old,
512 &ignore_indices,
513 );
514 }
515
516 Ok(())
517 }
518
519 fn process_copy_data(
521 &mut self,
522 data_stmt: &[u8],
523 table_name: &str,
524 table_schema: &crate::schema::TableSchema,
525 column_order: Vec<String>,
526 is_old: bool,
527 ) -> anyhow::Result<()> {
528 let data = data_stmt
531 .iter()
532 .skip_while(|&&b| b == b'\n' || b == b'\r' || b == b' ' || b == b'\t')
533 .cloned()
534 .collect::<Vec<u8>>();
535
536 if data.is_empty() {
537 return Ok(());
538 }
539
540 let rows = postgres_copy::parse_postgres_copy_rows(&data, table_schema, column_order)?;
541
542 let (pk_indices, has_override, invalid_cols) =
543 self.get_effective_pk_indices(table_name, table_schema);
544
545 let ignore_indices = self.get_ignore_indices(table_name, table_schema);
547
548 if !invalid_cols.is_empty() && !self.warned_tables.contains_key(table_name) {
550 self.warned_tables.insert(table_name.to_string(), ());
551 self.warnings.push(DiffWarning {
552 table: Some(table_name.to_string()),
553 message: format!(
554 "Primary key override column(s) not found: {}",
555 invalid_cols.join(", ")
556 ),
557 });
558 }
559
560 for row in rows {
561 let effective_pk = if has_override {
562 self.extract_pk_from_values(&row.all_values, &pk_indices)
563 } else {
564 row.pk
565 };
566 self.record_row(
567 table_name,
568 &effective_pk,
569 &row.all_values,
570 is_old,
571 &ignore_indices,
572 );
573 }
574
575 Ok(())
576 }
577
578 fn record_row(
580 &mut self,
581 table_name: &str,
582 pk: &Option<smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
583 all_values: &[mysql_insert::PkValue],
584 is_old: bool,
585 ignore_indices: &[usize],
586 ) {
587 if self.global_truncated {
588 let state = if is_old {
590 self.old_state
591 .entry(table_name.to_string())
592 .or_insert_with(|| {
593 let mut s = TableState::new();
594 s.pk_to_digest = None;
595 s.truncated = true;
596 s
597 })
598 } else {
599 self.new_state
600 .entry(table_name.to_string())
601 .or_insert_with(|| {
602 let mut s = TableState::new();
603 s.pk_to_digest = None;
604 s.truncated = true;
605 s
606 })
607 };
608 state.row_count += 1;
609 return;
610 }
611
612 let sample_size = self.options.sample_size;
613 let state = if is_old {
614 self.old_state
615 .entry(table_name.to_string())
616 .or_insert_with(|| {
617 if sample_size > 0 {
618 TableState::new_with_pk_strings()
619 } else {
620 TableState::new()
621 }
622 })
623 } else {
624 self.new_state
625 .entry(table_name.to_string())
626 .or_insert_with(|| {
627 if sample_size > 0 {
628 TableState::new_with_pk_strings()
629 } else {
630 TableState::new()
631 }
632 })
633 };
634
635 state.row_count += 1;
636
637 if let Some(ref map) = state.pk_to_digest {
639 if map.len() >= self.options.max_pk_entries_per_table {
640 state.pk_to_digest = None;
641 state.pk_strings = None;
642 state.truncated = true;
643 return;
644 }
645 }
646
647 if self.total_pk_entries >= self.options.max_pk_entries_global {
649 self.global_truncated = true;
650 state.pk_to_digest = None;
651 state.pk_strings = None;
652 state.truncated = true;
653 return;
654 }
655
656 if let Some(ref pk_values) = pk {
658 if let Some(ref mut map) = state.pk_to_digest {
659 let pk_hash = hash_pk_values(pk_values);
660 let row_digest = if ignore_indices.is_empty() {
661 hash_row_digest(all_values)
662 } else {
663 hash_row_digest_with_ignore(all_values, ignore_indices)
664 };
665 let was_new = map.insert(pk_hash, row_digest).is_none();
666 if was_new {
667 self.total_pk_entries += 1;
668 }
669
670 if let Some(ref mut pk_str_map) = state.pk_strings {
672 pk_str_map.insert(pk_hash, format_pk_value(pk_values));
673 }
674 }
675 }
676 }
677
678 pub fn compute_diff(self) -> (DataDiff, Vec<DiffWarning>) {
680 let mut tables: HashMap<String, TableDataDiff> = HashMap::new();
681 let sample_size = self.options.sample_size;
682
683 let mut all_tables: Vec<String> = self.old_state.keys().cloned().collect();
685 for name in self.new_state.keys() {
686 if !all_tables.contains(name) {
687 all_tables.push(name.clone());
688 }
689 }
690
691 for table_name in all_tables {
692 let old_state = self.old_state.get(&table_name);
693 let new_state = self.new_state.get(&table_name);
694
695 let mut diff = TableDataDiff {
696 old_row_count: old_state.map(|s| s.row_count).unwrap_or(0),
697 new_row_count: new_state.map(|s| s.row_count).unwrap_or(0),
698 truncated: old_state.map(|s| s.truncated).unwrap_or(false)
699 || new_state.map(|s| s.truncated).unwrap_or(false)
700 || self.global_truncated,
701 ..Default::default()
702 };
703
704 let old_map = old_state.and_then(|s| s.pk_to_digest.as_ref());
706 let new_map = new_state.and_then(|s| s.pk_to_digest.as_ref());
707
708 let old_pk_strings = old_state.and_then(|s| s.pk_strings.as_ref());
710 let new_pk_strings = new_state.and_then(|s| s.pk_strings.as_ref());
711
712 match (old_map, new_map) {
713 (Some(old), Some(new)) => {
714 for pk_hash in new.keys() {
716 if !old.contains_key(pk_hash) {
717 diff.added_count += 1;
718
719 if sample_size > 0 && diff.sample_added_pks.len() < sample_size {
721 if let Some(pk_str) = new_pk_strings.and_then(|m| m.get(pk_hash)) {
722 diff.sample_added_pks.push(pk_str.clone());
723 }
724 }
725 }
726 }
727
728 for (pk_hash, old_digest) in old {
730 match new.get(pk_hash) {
731 None => {
732 diff.removed_count += 1;
733
734 if sample_size > 0 && diff.sample_removed_pks.len() < sample_size {
736 if let Some(pk_str) =
737 old_pk_strings.and_then(|m| m.get(pk_hash))
738 {
739 diff.sample_removed_pks.push(pk_str.clone());
740 }
741 }
742 }
743 Some(new_digest) => {
744 if old_digest != new_digest {
745 diff.modified_count += 1;
746
747 if sample_size > 0
749 && diff.sample_modified_pks.len() < sample_size
750 {
751 if let Some(pk_str) =
752 old_pk_strings.and_then(|m| m.get(pk_hash))
753 {
754 diff.sample_modified_pks.push(pk_str.clone());
755 }
756 }
757 }
758 }
759 }
760 }
761 }
762 _ => {
763 if diff.new_row_count > diff.old_row_count {
765 diff.added_count = diff.new_row_count - diff.old_row_count;
766 } else if diff.old_row_count > diff.new_row_count {
767 diff.removed_count = diff.old_row_count - diff.new_row_count;
768 }
769 }
770 }
771
772 if diff.old_row_count > 0
774 || diff.new_row_count > 0
775 || diff.added_count > 0
776 || diff.removed_count > 0
777 || diff.modified_count > 0
778 {
779 tables.insert(table_name, diff);
780 }
781 }
782
783 (DataDiff { tables }, self.warnings)
784 }
785}