Skip to main content

sql_splitter/differ/
data.rs

1//! Data comparison for diff command.
2//!
3//! Uses streaming with memory-bounded PK tracking to compare row-level
4//! differences between two SQL dumps.
5
6use super::{parse_ignore_patterns, should_ignore_column, should_include_table, DiffWarning};
7use crate::parser::{
8    determine_buffer_size, mysql_insert, postgres_copy, Parser, SqlDialect, StatementType,
9};
10use crate::pk::{hash_pk_values, PkHash};
11use crate::progress::ProgressReader;
12use crate::schema::Schema;
13use crate::splitter::Compression;
14use ahash::AHashMap;
15use glob::Pattern;
16use serde::Serialize;
17use std::collections::HashMap;
18use std::fs::File;
19use std::hash::{Hash, Hasher};
20use std::io::Read;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24/// Options for data comparison
25#[derive(Debug, Clone)]
26pub struct DataDiffOptions {
27    /// Maximum PK entries to track globally
28    pub max_pk_entries_global: usize,
29    /// Maximum PK entries per table
30    pub max_pk_entries_per_table: usize,
31    /// Number of sample rows to collect for verbose mode
32    pub sample_size: usize,
33    /// Tables to include (if empty, include all)
34    pub tables: Vec<String>,
35    /// Tables to exclude
36    pub exclude: Vec<String>,
37    /// Don't skip tables without PK, use all columns as key
38    pub allow_no_pk: bool,
39    /// Primary key overrides: table name -> column names
40    pub pk_overrides: std::collections::HashMap<String, Vec<String>>,
41    /// Column patterns to ignore (glob format: table.column)
42    pub ignore_columns: Vec<String>,
43}
44
45impl Default for DataDiffOptions {
46    fn default() -> Self {
47        Self {
48            max_pk_entries_global: 10_000_000,
49            max_pk_entries_per_table: 5_000_000,
50            sample_size: 0,
51            tables: Vec::new(),
52            exclude: Vec::new(),
53            allow_no_pk: false,
54            pk_overrides: std::collections::HashMap::new(),
55            ignore_columns: Vec::new(),
56        }
57    }
58}
59
60/// Complete data diff result
61#[derive(Debug, Serialize)]
62pub struct DataDiff {
63    /// Per-table data differences
64    pub tables: HashMap<String, TableDataDiff>,
65}
66
67/// Data differences for a single table
68#[derive(Debug, Serialize, Clone, Default)]
69pub struct TableDataDiff {
70    /// Row count in old file
71    pub old_row_count: u64,
72    /// Row count in new file
73    pub new_row_count: u64,
74    /// Number of rows added (in new but not old)
75    pub added_count: u64,
76    /// Number of rows removed (in old but not new)
77    pub removed_count: u64,
78    /// Number of rows modified (same PK, different content)
79    pub modified_count: u64,
80    /// Whether tracking was truncated due to memory limits
81    pub truncated: bool,
82    /// Sample PKs for added rows (only when verbose)
83    #[serde(skip_serializing_if = "Vec::is_empty")]
84    pub sample_added_pks: Vec<String>,
85    /// Sample PKs for removed rows (only when verbose)
86    #[serde(skip_serializing_if = "Vec::is_empty")]
87    pub sample_removed_pks: Vec<String>,
88    /// Sample PKs for modified rows (only when verbose)
89    #[serde(skip_serializing_if = "Vec::is_empty")]
90    pub sample_modified_pks: Vec<String>,
91}
92
93/// Per-table state during data scanning
94struct TableState {
95    /// Row count seen
96    row_count: u64,
97    /// Map from PK hash to row digest (for detecting modifications)
98    /// None if tracking was disabled due to memory limits
99    pk_to_digest: Option<AHashMap<PkHash, u64>>,
100    /// Map from PK hash to formatted PK string (only when sample_size > 0)
101    pk_strings: Option<AHashMap<PkHash, String>>,
102    /// Whether tracking was truncated
103    truncated: bool,
104}
105
106impl TableState {
107    fn new() -> Self {
108        Self {
109            row_count: 0,
110            pk_to_digest: Some(AHashMap::new()),
111            pk_strings: None,
112            truncated: false,
113        }
114    }
115
116    fn new_with_pk_strings() -> Self {
117        Self {
118            row_count: 0,
119            pk_to_digest: Some(AHashMap::new()),
120            pk_strings: Some(AHashMap::new()),
121            truncated: false,
122        }
123    }
124}
125
126/// Hash non-PK column values to detect row modifications
127fn hash_row_digest(values: &[mysql_insert::PkValue]) -> u64 {
128    let mut hasher = ahash::AHasher::default();
129    for v in values {
130        match v {
131            mysql_insert::PkValue::Int(i) => {
132                0u8.hash(&mut hasher);
133                i.hash(&mut hasher);
134            }
135            mysql_insert::PkValue::BigInt(i) => {
136                1u8.hash(&mut hasher);
137                i.hash(&mut hasher);
138            }
139            mysql_insert::PkValue::Text(s) => {
140                2u8.hash(&mut hasher);
141                s.hash(&mut hasher);
142            }
143            mysql_insert::PkValue::Null => {
144                3u8.hash(&mut hasher);
145            }
146        }
147    }
148    hasher.finish()
149}
150
151/// Format a single PK value as a string
152fn format_single_pk(v: &mysql_insert::PkValue) -> String {
153    match v {
154        mysql_insert::PkValue::Int(i) => i.to_string(),
155        mysql_insert::PkValue::BigInt(i) => i.to_string(),
156        mysql_insert::PkValue::Text(s) => s.to_string(),
157        mysql_insert::PkValue::Null => "NULL".to_string(),
158    }
159}
160
161/// Format a PK tuple as a string (single value as-is, composite as "(val1, val2)")
162fn format_pk_value(pk: &[mysql_insert::PkValue]) -> String {
163    if pk.len() == 1 {
164        format_single_pk(&pk[0])
165    } else {
166        let parts: Vec<String> = pk.iter().map(format_single_pk).collect();
167        format!("({})", parts.join(", "))
168    }
169}
170
171/// Hash non-PK column values to detect row modifications, excluding ignored column indices
172fn hash_row_digest_with_ignore(values: &[mysql_insert::PkValue], ignore_indices: &[usize]) -> u64 {
173    let mut hasher = ahash::AHasher::default();
174    for (i, v) in values.iter().enumerate() {
175        if ignore_indices.contains(&i) {
176            continue;
177        }
178        match v {
179            mysql_insert::PkValue::Int(val) => {
180                0u8.hash(&mut hasher);
181                val.hash(&mut hasher);
182            }
183            mysql_insert::PkValue::BigInt(val) => {
184                1u8.hash(&mut hasher);
185                val.hash(&mut hasher);
186            }
187            mysql_insert::PkValue::Text(s) => {
188                2u8.hash(&mut hasher);
189                s.hash(&mut hasher);
190            }
191            mysql_insert::PkValue::Null => {
192                3u8.hash(&mut hasher);
193            }
194        }
195    }
196    hasher.finish()
197}
198
199/// Data differ engine that accumulates state across file scans
200pub struct DataDiffer {
201    options: DataDiffOptions,
202    /// State for old file: table -> (pk_hash -> row_digest)
203    old_state: AHashMap<String, TableState>,
204    /// State for new file: table -> (pk_hash -> row_digest)
205    new_state: AHashMap<String, TableState>,
206    /// Total PK entries tracked
207    total_pk_entries: usize,
208    /// Whether global memory limit was exceeded
209    global_truncated: bool,
210    /// Current COPY context for PostgreSQL: (table_name, column_order)
211    current_copy_context: Option<(String, Vec<String>)>,
212    /// Warnings generated during diff
213    warnings: Vec<DiffWarning>,
214    /// Tables already warned about (to avoid duplicate warnings)
215    warned_tables: AHashMap<String, ()>,
216    /// Compiled ignore column patterns
217    ignore_patterns: Vec<Pattern>,
218    /// Cache of ignored column indices per table
219    ignore_indices_cache: AHashMap<String, Vec<usize>>,
220}
221
222impl DataDiffer {
223    /// Create a new data differ
224    pub fn new(options: DataDiffOptions) -> Self {
225        let ignore_patterns = parse_ignore_patterns(&options.ignore_columns);
226        Self {
227            options,
228            old_state: AHashMap::new(),
229            new_state: AHashMap::new(),
230            total_pk_entries: 0,
231            global_truncated: false,
232            current_copy_context: None,
233            warnings: Vec::new(),
234            warned_tables: AHashMap::new(),
235            ignore_patterns,
236            ignore_indices_cache: AHashMap::new(),
237        }
238    }
239
240    /// Get ignored column indices for a table
241    fn get_ignore_indices(
242        &mut self,
243        table_name: &str,
244        table_schema: &crate::schema::TableSchema,
245    ) -> Vec<usize> {
246        let table_lower = table_name.to_lowercase();
247        if let Some(indices) = self.ignore_indices_cache.get(&table_lower) {
248            return indices.clone();
249        }
250
251        // Get PK column indices
252        let pk_indices: Vec<usize> = table_schema
253            .primary_key
254            .iter()
255            .map(|col_id| col_id.0 as usize)
256            .collect();
257
258        let mut indices: Vec<usize> = Vec::new();
259        for (i, col) in table_schema.columns.iter().enumerate() {
260            if should_ignore_column(table_name, &col.name, &self.ignore_patterns) {
261                // Warn if trying to ignore a PK column (but still allow it for non-PK uses)
262                if pk_indices.contains(&i) && !self.warned_tables.contains_key(&table_lower) {
263                    self.warnings.push(DiffWarning {
264                        table: Some(table_name.to_string()),
265                        message: format!(
266                            "Ignoring primary key column '{}' may affect diff accuracy",
267                            col.name
268                        ),
269                    });
270                }
271                indices.push(i);
272            }
273        }
274
275        self.ignore_indices_cache
276            .insert(table_lower, indices.clone());
277        indices
278    }
279
280    /// Get effective PK column indices for a table, considering overrides
281    /// Returns (indices, has_override, invalid_columns) tuple
282    fn get_effective_pk_indices(
283        &self,
284        table_name: &str,
285        table_schema: &crate::schema::TableSchema,
286    ) -> (Vec<usize>, bool, Vec<String>) {
287        if let Some(override_cols) = self.options.pk_overrides.get(&table_name.to_lowercase()) {
288            let mut indices: Vec<usize> = Vec::new();
289            let mut invalid_cols: Vec<String> = Vec::new();
290
291            for col_name in override_cols {
292                if let Some(idx) = table_schema
293                    .columns
294                    .iter()
295                    .position(|c| c.name.eq_ignore_ascii_case(col_name))
296                {
297                    indices.push(idx);
298                } else {
299                    invalid_cols.push(col_name.clone());
300                }
301            }
302
303            (indices, true, invalid_cols)
304        } else {
305            let indices: Vec<usize> = table_schema
306                .primary_key
307                .iter()
308                .map(|col_id| col_id.0 as usize)
309                .collect();
310            (indices, false, Vec::new())
311        }
312    }
313
314    /// Extract PK from all_values using the given column indices
315    fn extract_pk_from_values(
316        &self,
317        all_values: &[mysql_insert::PkValue],
318        pk_indices: &[usize],
319    ) -> Option<smallvec::SmallVec<[mysql_insert::PkValue; 2]>> {
320        if pk_indices.is_empty() {
321            return None;
322        }
323        let mut pk_values: smallvec::SmallVec<[mysql_insert::PkValue; 2]> =
324            smallvec::SmallVec::new();
325        for &idx in pk_indices {
326            if let Some(val) = all_values.get(idx) {
327                if val.is_null() {
328                    return None;
329                }
330                pk_values.push(val.clone());
331            } else {
332                return None;
333            }
334        }
335        if pk_values.is_empty() {
336            None
337        } else {
338            Some(pk_values)
339        }
340    }
341
342    /// Scan a SQL file and accumulate PK/digest state
343    #[allow(clippy::too_many_arguments)]
344    pub fn scan_file(
345        &mut self,
346        path: &PathBuf,
347        schema: &Schema,
348        dialect: SqlDialect,
349        is_old: bool,
350        progress_fn: &Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
351        byte_offset: u64,
352        total_bytes: u64,
353    ) -> anyhow::Result<()> {
354        let file = File::open(path)?;
355        let file_size = file.metadata()?.len();
356        let buffer_size = determine_buffer_size(file_size);
357        let compression = Compression::from_path(path);
358
359        let reader: Box<dyn Read> = if let Some(ref cb) = progress_fn {
360            let cb = Arc::clone(cb);
361            let progress_reader = ProgressReader::new(file, move |bytes| {
362                cb(byte_offset + bytes, total_bytes);
363            });
364            compression.wrap_reader(Box::new(progress_reader))?
365        } else {
366            compression.wrap_reader(Box::new(file))?
367        };
368
369        let mut parser = Parser::with_dialect(reader, buffer_size, dialect);
370
371        // Reset COPY context for this file scan
372        self.current_copy_context = None;
373
374        while let Some(stmt) = parser.read_statement()? {
375            let (stmt_type, table_name) =
376                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
377
378            // Handle PostgreSQL COPY data (separate statement from header)
379            if dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
380                // Check if this looks like COPY data (ends with \.)
381                if stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n") {
382                    if let Some((ref copy_table, ref column_order)) =
383                        self.current_copy_context.clone()
384                    {
385                        // Check table filter
386                        if should_include_table(
387                            copy_table,
388                            &self.options.tables,
389                            &self.options.exclude,
390                        ) {
391                            if let Some(table_schema) = schema.get_table(copy_table) {
392                                let has_pk = !table_schema.primary_key.is_empty();
393                                let has_pk_override = self
394                                    .options
395                                    .pk_overrides
396                                    .contains_key(&copy_table.to_lowercase());
397                                if has_pk || self.options.allow_no_pk || has_pk_override {
398                                    self.process_copy_data(
399                                        &stmt,
400                                        copy_table,
401                                        table_schema,
402                                        column_order.clone(),
403                                        is_old,
404                                    )?;
405                                } else if !self.warned_tables.contains_key(copy_table) {
406                                    self.warned_tables.insert(copy_table.clone(), ());
407                                    self.warnings.push(DiffWarning {
408                                        table: Some(copy_table.clone()),
409                                        message: "No primary key, data comparison skipped"
410                                            .to_string(),
411                                    });
412                                }
413                            }
414                        }
415                    }
416                }
417                self.current_copy_context = None;
418                continue;
419            }
420
421            if table_name.is_empty() {
422                continue;
423            }
424
425            // Check table filter
426            if !should_include_table(&table_name, &self.options.tables, &self.options.exclude) {
427                continue;
428            }
429
430            // Get table schema for PK info
431            let table_schema = match schema.get_table(&table_name) {
432                Some(t) => t,
433                None => continue,
434            };
435
436            // Handle tables without primary key (unless there's an override)
437            let has_pk_override = self
438                .options
439                .pk_overrides
440                .contains_key(&table_name.to_lowercase());
441            if table_schema.primary_key.is_empty() && !self.options.allow_no_pk && !has_pk_override
442            {
443                // Emit warning once per table
444                if !self.warned_tables.contains_key(&table_name) {
445                    self.warned_tables.insert(table_name.clone(), ());
446                    self.warnings.push(DiffWarning {
447                        table: Some(table_name.clone()),
448                        message: "No primary key, data comparison skipped".to_string(),
449                    });
450                }
451                continue;
452                // allow_no_pk is true - we'll use all columns as key (handled in process_*)
453            }
454
455            match stmt_type {
456                StatementType::Insert => {
457                    self.process_insert_statement(&stmt, &table_name, table_schema, is_old)?;
458                }
459                StatementType::Copy => {
460                    // For PostgreSQL COPY, the data comes in the next statement
461                    // Save context for processing the data statement
462                    let header = String::from_utf8_lossy(&stmt);
463                    let column_order = postgres_copy::parse_copy_columns(&header);
464                    self.current_copy_context = Some((table_name.clone(), column_order));
465                }
466                _ => {}
467            }
468        }
469
470        Ok(())
471    }
472
473    /// Process an INSERT statement
474    fn process_insert_statement(
475        &mut self,
476        stmt: &[u8],
477        table_name: &str,
478        table_schema: &crate::schema::TableSchema,
479        is_old: bool,
480    ) -> anyhow::Result<()> {
481        let rows = mysql_insert::parse_mysql_insert_rows(stmt, table_schema)?;
482
483        let (pk_indices, has_override, invalid_cols) =
484            self.get_effective_pk_indices(table_name, table_schema);
485
486        // Get ignore indices for this table
487        let ignore_indices = self.get_ignore_indices(table_name, table_schema);
488
489        // Warn about invalid override columns (once per table)
490        if !invalid_cols.is_empty() && !self.warned_tables.contains_key(table_name) {
491            self.warned_tables.insert(table_name.to_string(), ());
492            self.warnings.push(DiffWarning {
493                table: Some(table_name.to_string()),
494                message: format!(
495                    "Primary key override column(s) not found: {}",
496                    invalid_cols.join(", ")
497                ),
498            });
499        }
500
501        for row in rows {
502            let effective_pk = if has_override {
503                self.extract_pk_from_values(&row.all_values, &pk_indices)
504            } else {
505                row.pk
506            };
507            self.record_row(
508                table_name,
509                &effective_pk,
510                &row.all_values,
511                is_old,
512                &ignore_indices,
513            );
514        }
515
516        Ok(())
517    }
518
519    /// Process PostgreSQL COPY data (the data lines after the COPY header)
520    fn process_copy_data(
521        &mut self,
522        data_stmt: &[u8],
523        table_name: &str,
524        table_schema: &crate::schema::TableSchema,
525        column_order: Vec<String>,
526        is_old: bool,
527    ) -> anyhow::Result<()> {
528        // The data_stmt contains the raw COPY data lines (may have leading newline)
529        // Strip leading whitespace/newlines
530        let data = data_stmt
531            .iter()
532            .skip_while(|&&b| b == b'\n' || b == b'\r' || b == b' ' || b == b'\t')
533            .cloned()
534            .collect::<Vec<u8>>();
535
536        if data.is_empty() {
537            return Ok(());
538        }
539
540        let rows = postgres_copy::parse_postgres_copy_rows(&data, table_schema, column_order)?;
541
542        let (pk_indices, has_override, invalid_cols) =
543            self.get_effective_pk_indices(table_name, table_schema);
544
545        // Get ignore indices for this table
546        let ignore_indices = self.get_ignore_indices(table_name, table_schema);
547
548        // Warn about invalid override columns (once per table)
549        if !invalid_cols.is_empty() && !self.warned_tables.contains_key(table_name) {
550            self.warned_tables.insert(table_name.to_string(), ());
551            self.warnings.push(DiffWarning {
552                table: Some(table_name.to_string()),
553                message: format!(
554                    "Primary key override column(s) not found: {}",
555                    invalid_cols.join(", ")
556                ),
557            });
558        }
559
560        for row in rows {
561            let effective_pk = if has_override {
562                self.extract_pk_from_values(&row.all_values, &pk_indices)
563            } else {
564                row.pk
565            };
566            self.record_row(
567                table_name,
568                &effective_pk,
569                &row.all_values,
570                is_old,
571                &ignore_indices,
572            );
573        }
574
575        Ok(())
576    }
577
578    /// Record a row in the appropriate state map
579    fn record_row(
580        &mut self,
581        table_name: &str,
582        pk: &Option<smallvec::SmallVec<[mysql_insert::PkValue; 2]>>,
583        all_values: &[mysql_insert::PkValue],
584        is_old: bool,
585        ignore_indices: &[usize],
586    ) {
587        if self.global_truncated {
588            // Still count rows but don't track PKs
589            let state = if is_old {
590                self.old_state
591                    .entry(table_name.to_string())
592                    .or_insert_with(|| {
593                        let mut s = TableState::new();
594                        s.pk_to_digest = None;
595                        s.truncated = true;
596                        s
597                    })
598            } else {
599                self.new_state
600                    .entry(table_name.to_string())
601                    .or_insert_with(|| {
602                        let mut s = TableState::new();
603                        s.pk_to_digest = None;
604                        s.truncated = true;
605                        s
606                    })
607            };
608            state.row_count += 1;
609            return;
610        }
611
612        let sample_size = self.options.sample_size;
613        let state = if is_old {
614            self.old_state
615                .entry(table_name.to_string())
616                .or_insert_with(|| {
617                    if sample_size > 0 {
618                        TableState::new_with_pk_strings()
619                    } else {
620                        TableState::new()
621                    }
622                })
623        } else {
624            self.new_state
625                .entry(table_name.to_string())
626                .or_insert_with(|| {
627                    if sample_size > 0 {
628                        TableState::new_with_pk_strings()
629                    } else {
630                        TableState::new()
631                    }
632                })
633        };
634
635        state.row_count += 1;
636
637        // Check per-table limit
638        if let Some(ref map) = state.pk_to_digest {
639            if map.len() >= self.options.max_pk_entries_per_table {
640                state.pk_to_digest = None;
641                state.pk_strings = None;
642                state.truncated = true;
643                return;
644            }
645        }
646
647        // Check global limit
648        if self.total_pk_entries >= self.options.max_pk_entries_global {
649            self.global_truncated = true;
650            state.pk_to_digest = None;
651            state.pk_strings = None;
652            state.truncated = true;
653            return;
654        }
655
656        // Record PK and digest
657        if let Some(ref pk_values) = pk {
658            if let Some(ref mut map) = state.pk_to_digest {
659                let pk_hash = hash_pk_values(pk_values);
660                let row_digest = if ignore_indices.is_empty() {
661                    hash_row_digest(all_values)
662                } else {
663                    hash_row_digest_with_ignore(all_values, ignore_indices)
664                };
665                let was_new = map.insert(pk_hash, row_digest).is_none();
666                if was_new {
667                    self.total_pk_entries += 1;
668                }
669
670                // Also store PK string for sampling if enabled
671                if let Some(ref mut pk_str_map) = state.pk_strings {
672                    pk_str_map.insert(pk_hash, format_pk_value(pk_values));
673                }
674            }
675        }
676    }
677
678    /// Compute the final diff from accumulated state
679    pub fn compute_diff(self) -> (DataDiff, Vec<DiffWarning>) {
680        let mut tables: HashMap<String, TableDataDiff> = HashMap::new();
681        let sample_size = self.options.sample_size;
682
683        // Get all table names from both states
684        let mut all_tables: Vec<String> = self.old_state.keys().cloned().collect();
685        for name in self.new_state.keys() {
686            if !all_tables.contains(name) {
687                all_tables.push(name.clone());
688            }
689        }
690
691        for table_name in all_tables {
692            let old_state = self.old_state.get(&table_name);
693            let new_state = self.new_state.get(&table_name);
694
695            let mut diff = TableDataDiff {
696                old_row_count: old_state.map(|s| s.row_count).unwrap_or(0),
697                new_row_count: new_state.map(|s| s.row_count).unwrap_or(0),
698                truncated: old_state.map(|s| s.truncated).unwrap_or(false)
699                    || new_state.map(|s| s.truncated).unwrap_or(false)
700                    || self.global_truncated,
701                ..Default::default()
702            };
703
704            // If we have full PK maps, compute detailed diff
705            let old_map = old_state.and_then(|s| s.pk_to_digest.as_ref());
706            let new_map = new_state.and_then(|s| s.pk_to_digest.as_ref());
707
708            // Get PK string maps for sampling
709            let old_pk_strings = old_state.and_then(|s| s.pk_strings.as_ref());
710            let new_pk_strings = new_state.and_then(|s| s.pk_strings.as_ref());
711
712            match (old_map, new_map) {
713                (Some(old), Some(new)) => {
714                    // Count added (in new but not old) and collect samples
715                    for pk_hash in new.keys() {
716                        if !old.contains_key(pk_hash) {
717                            diff.added_count += 1;
718
719                            // Collect sample PK strings
720                            if sample_size > 0 && diff.sample_added_pks.len() < sample_size {
721                                if let Some(pk_str) = new_pk_strings.and_then(|m| m.get(pk_hash)) {
722                                    diff.sample_added_pks.push(pk_str.clone());
723                                }
724                            }
725                        }
726                    }
727
728                    // Count removed (in old but not new) and modified (same PK, different digest)
729                    for (pk_hash, old_digest) in old {
730                        match new.get(pk_hash) {
731                            None => {
732                                diff.removed_count += 1;
733
734                                // Collect sample PK strings
735                                if sample_size > 0 && diff.sample_removed_pks.len() < sample_size {
736                                    if let Some(pk_str) =
737                                        old_pk_strings.and_then(|m| m.get(pk_hash))
738                                    {
739                                        diff.sample_removed_pks.push(pk_str.clone());
740                                    }
741                                }
742                            }
743                            Some(new_digest) => {
744                                if old_digest != new_digest {
745                                    diff.modified_count += 1;
746
747                                    // Collect sample PK strings
748                                    if sample_size > 0
749                                        && diff.sample_modified_pks.len() < sample_size
750                                    {
751                                        if let Some(pk_str) =
752                                            old_pk_strings.and_then(|m| m.get(pk_hash))
753                                        {
754                                            diff.sample_modified_pks.push(pk_str.clone());
755                                        }
756                                    }
757                                }
758                            }
759                        }
760                    }
761                }
762                _ => {
763                    // Truncated - can only report row count differences
764                    if diff.new_row_count > diff.old_row_count {
765                        diff.added_count = diff.new_row_count - diff.old_row_count;
766                    } else if diff.old_row_count > diff.new_row_count {
767                        diff.removed_count = diff.old_row_count - diff.new_row_count;
768                    }
769                }
770            }
771
772            // Only include tables with changes or both files had data
773            if diff.old_row_count > 0
774                || diff.new_row_count > 0
775                || diff.added_count > 0
776                || diff.removed_count > 0
777                || diff.modified_count > 0
778            {
779                tables.insert(table_name, diff);
780            }
781        }
782
783        (DataDiff { tables }, self.warnings)
784    }
785}