mod data;
mod output;
mod schema;
pub use data::*;
pub use output::*;
pub use schema::*;
use crate::parser::{determine_buffer_size, Parser, SqlDialect, StatementType};
use crate::progress::ProgressReader;
use crate::schema::{Schema, SchemaBuilder};
use crate::splitter::Compression;
use glob::Pattern;
use serde::Serialize;
use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct DiffConfig {
pub old_path: PathBuf,
pub new_path: PathBuf,
pub dialect: Option<SqlDialect>,
pub schema_only: bool,
pub data_only: bool,
pub tables: Vec<String>,
pub exclude: Vec<String>,
pub format: DiffOutputFormat,
pub verbose: bool,
pub progress: bool,
pub max_pk_entries: usize,
pub allow_no_pk: bool,
pub ignore_column_order: bool,
pub pk_overrides: std::collections::HashMap<String, Vec<String>>,
pub ignore_columns: Vec<String>,
}
impl Default for DiffConfig {
fn default() -> Self {
Self {
old_path: PathBuf::new(),
new_path: PathBuf::new(),
dialect: None,
schema_only: false,
data_only: false,
tables: Vec::new(),
exclude: Vec::new(),
format: DiffOutputFormat::Text,
verbose: false,
progress: false,
max_pk_entries: 10_000_000, allow_no_pk: false,
ignore_column_order: false,
pk_overrides: std::collections::HashMap::new(),
ignore_columns: Vec::new(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DiffOutputFormat {
#[default]
Text,
Json,
Sql,
}
impl std::str::FromStr for DiffOutputFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"text" => Ok(Self::Text),
"json" => Ok(Self::Json),
"sql" => Ok(Self::Sql),
_ => Err(format!("Unknown format: {}. Use: text, json, sql", s)),
}
}
}
#[derive(Debug, Serialize, Clone)]
pub struct DiffWarning {
#[serde(skip_serializing_if = "Option::is_none")]
pub table: Option<String>,
pub message: String,
}
#[derive(Debug, Serialize)]
pub struct DiffResult {
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<SchemaDiff>,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<DataDiff>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub warnings: Vec<DiffWarning>,
pub summary: DiffSummary,
}
#[derive(Debug, Serialize)]
pub struct DiffSummary {
pub tables_added: usize,
pub tables_removed: usize,
pub tables_modified: usize,
pub rows_added: u64,
pub rows_removed: u64,
pub rows_modified: u64,
pub truncated: bool,
}
pub struct Differ {
config: DiffConfig,
dialect: SqlDialect,
progress_fn: Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
}
impl Differ {
pub fn new(config: DiffConfig) -> Self {
Self {
dialect: config.dialect.unwrap_or(SqlDialect::MySql),
config,
progress_fn: None,
}
}
pub fn with_progress<F>(mut self, f: F) -> Self
where
F: Fn(u64, u64) + Send + Sync + 'static,
{
self.progress_fn = Some(Arc::new(f));
self
}
pub fn diff(self) -> anyhow::Result<DiffResult> {
let old_size = std::fs::metadata(&self.config.old_path)?.len();
let new_size = std::fs::metadata(&self.config.new_path)?.len();
let total_bytes = if self.config.schema_only || self.config.data_only {
old_size + new_size
} else {
(old_size + new_size) * 2 };
let old_schema = self.extract_schema(&self.config.old_path.clone(), 0, total_bytes)?;
let new_schema =
self.extract_schema(&self.config.new_path.clone(), old_size, total_bytes)?;
let schema_diff = if !self.config.data_only {
Some(compare_schemas(&old_schema, &new_schema, &self.config))
} else {
None
};
let (data_diff, warnings) = if !self.config.schema_only {
let base_offset = if self.config.data_only {
0
} else {
old_size + new_size
};
let (data, warns) =
self.compare_data(&old_schema, &new_schema, base_offset, total_bytes)?;
(Some(data), warns)
} else {
(None, Vec::new())
};
let summary = self.build_summary(&schema_diff, &data_diff);
Ok(DiffResult {
schema: schema_diff,
data: data_diff,
warnings,
summary,
})
}
fn extract_schema(
&self,
path: &Path,
byte_offset: u64,
total_bytes: u64,
) -> anyhow::Result<Schema> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
let buffer_size = determine_buffer_size(file_size);
let compression = Compression::from_path(path);
let reader: Box<dyn Read> = if let Some(ref cb) = self.progress_fn {
let cb = Arc::clone(cb);
let progress_reader = ProgressReader::new(file, move |bytes| {
cb(byte_offset + bytes, total_bytes);
});
compression.wrap_reader(Box::new(progress_reader))?
} else {
compression.wrap_reader(Box::new(file))?
};
let mut parser = Parser::with_dialect(reader, buffer_size, self.dialect);
let mut builder = SchemaBuilder::new();
while let Some(stmt) = parser.read_statement()? {
let (stmt_type, _table_name) =
Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.dialect);
match stmt_type {
StatementType::CreateTable => {
if let Ok(stmt_str) = std::str::from_utf8(&stmt) {
builder.parse_create_table(stmt_str);
}
}
StatementType::AlterTable => {
if let Ok(stmt_str) = std::str::from_utf8(&stmt) {
builder.parse_alter_table(stmt_str);
}
}
StatementType::CreateIndex => {
if let Ok(stmt_str) = std::str::from_utf8(&stmt) {
builder.parse_create_index(stmt_str);
}
}
_ => {}
}
}
Ok(builder.build())
}
fn compare_data(
&self,
old_schema: &Schema,
new_schema: &Schema,
byte_offset: u64,
total_bytes: u64,
) -> anyhow::Result<(DataDiff, Vec<DiffWarning>)> {
let mut data_differ = DataDiffer::new(DataDiffOptions {
max_pk_entries_global: self.config.max_pk_entries,
max_pk_entries_per_table: self.config.max_pk_entries / 2,
sample_size: if self.config.verbose { 100 } else { 0 },
tables: self.config.tables.clone(),
exclude: self.config.exclude.clone(),
allow_no_pk: self.config.allow_no_pk,
pk_overrides: self.config.pk_overrides.clone(),
ignore_columns: self.config.ignore_columns.clone(),
});
let old_size = std::fs::metadata(&self.config.old_path)?.len();
data_differ.scan_file(
&self.config.old_path,
old_schema,
self.dialect,
true, &self.progress_fn,
byte_offset,
total_bytes,
)?;
data_differ.scan_file(
&self.config.new_path,
new_schema,
self.dialect,
false, &self.progress_fn,
byte_offset + old_size,
total_bytes,
)?;
Ok(data_differ.compute_diff())
}
fn build_summary(
&self,
schema_diff: &Option<SchemaDiff>,
data_diff: &Option<DataDiff>,
) -> DiffSummary {
let (tables_added, tables_removed, schema_modified) = schema_diff
.as_ref()
.map(|s| {
(
s.tables_added.len(),
s.tables_removed.len(),
s.tables_modified.len(),
)
})
.unwrap_or((0, 0, 0));
let (rows_added, rows_removed, rows_modified, data_modified, truncated) = data_diff
.as_ref()
.map(|d| {
let mut added = 0u64;
let mut removed = 0u64;
let mut modified = 0u64;
let mut tables_with_changes = 0usize;
let mut any_truncated = false;
for table_diff in d.tables.values() {
added += table_diff.added_count;
removed += table_diff.removed_count;
modified += table_diff.modified_count;
if table_diff.added_count > 0
|| table_diff.removed_count > 0
|| table_diff.modified_count > 0
{
tables_with_changes += 1;
}
if table_diff.truncated {
any_truncated = true;
}
}
(added, removed, modified, tables_with_changes, any_truncated)
})
.unwrap_or((0, 0, 0, 0, false));
DiffSummary {
tables_added,
tables_removed,
tables_modified: schema_modified.max(data_modified),
rows_added,
rows_removed,
rows_modified,
truncated,
}
}
}
pub fn parse_ignore_patterns(patterns: &[String]) -> Vec<Pattern> {
patterns
.iter()
.filter_map(|p| Pattern::new(&p.to_lowercase()).ok())
.collect()
}
pub fn should_ignore_column(table: &str, column: &str, patterns: &[Pattern]) -> bool {
let full_name = format!("{}.{}", table.to_lowercase(), column.to_lowercase());
patterns.iter().any(|p| p.matches(&full_name))
}
pub fn should_include_table(table_name: &str, tables: &[String], exclude: &[String]) -> bool {
if !tables.is_empty() {
let name_lower = table_name.to_lowercase();
if !tables.iter().any(|t| t.to_lowercase() == name_lower) {
return false;
}
}
if !exclude.is_empty() {
let name_lower = table_name.to_lowercase();
if exclude.iter().any(|t| t.to_lowercase() == name_lower) {
return false;
}
}
true
}