use crate::databases::DatabaseConnection;
use crate::models::{TableSchema, FieldMapping, Field};
use crate::utils::OperationTimer;
use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub struct TransferOptions {
pub batch_size: usize,
pub verify_data: bool,
pub skip_errors: bool,
pub max_errors: usize,
pub show_progress: bool,
}
impl Default for TransferOptions {
fn default() -> Self {
Self {
batch_size: 1000,
verify_data: true,
skip_errors: true,
max_errors: 100,
show_progress: true,
}
}
}
pub type ProgressCallback = Box<dyn Fn(TransferProgress) + Send + Sync>;
#[derive(Debug, Clone)]
pub struct TransferProgress {
pub table_name: String,
pub rows_transferred: u64,
pub rows_failed: u64,
pub total_rows: u64,
pub percentage: f64,
pub bytes_transferred: u64,
}
pub struct DataTransfer {
source_db: Box<dyn DatabaseConnection>,
target_db: Box<dyn DatabaseConnection>,
options: TransferOptions,
progress: Arc<Mutex<TransferProgress>>,
}
impl DataTransfer {
pub fn new(source_db: Box<dyn DatabaseConnection>, target_db: Box<dyn DatabaseConnection>) -> Self {
Self {
source_db,
target_db,
options: TransferOptions::default(),
progress: Arc::new(Mutex::new(TransferProgress {
table_name: String::new(),
rows_transferred: 0,
rows_failed: 0,
total_rows: 0,
percentage: 0.0,
bytes_transferred: 0,
})),
}
}
pub fn with_options(mut self, options: TransferOptions) -> Self {
self.options = options;
self
}
pub async fn transfer(&self, mappings: Vec<FieldMapping>) -> Result<TransferReport> {
let timer = OperationTimer::new("data_transfer");
let mut report = TransferReport::default();
let table_mappings = self.group_mappings_by_table(mappings);
let total_tables = table_mappings.len() as u32;
for (idx, (source_table, table_mapping)) in table_mappings.into_iter().enumerate() {
let target_table = table_mapping.first()
.map(|m| m.target_table.clone())
.unwrap_or_default();
if self.options.show_progress {
println!("[{}/{}] 迁移表: {} -> {}",
idx + 1, total_tables, source_table, target_table);
}
match self.transfer_table(&source_table, &target_table, &table_mapping).await {
Ok(rows) => {
report.tables_transferred += 1;
report.rows_transferred += rows;
}
Err(e) => {
report.errors.push(e.to_string());
report.rows_failed += 1;
if !self.options.skip_errors {
return Err(anyhow::anyhow!("Transfer failed: {}", e));
}
}
}
}
report.duration = timer.finish();
report.success = report.errors.is_empty();
Ok(report)
}
fn group_mappings_by_table(&self, mappings: Vec<FieldMapping>) -> HashMap<String, Vec<FieldMapping>> {
let mut table_mappings: HashMap<String, Vec<FieldMapping>> = HashMap::new();
for mapping in &mappings {
table_mappings
.entry(mapping.source_table.clone())
.or_default()
.push(mapping.clone());
}
table_mappings
}
async fn transfer_table(&self, source_table: &str, target_table: &str, mappings: &[FieldMapping]) -> Result<u64> {
let source_fields: Vec<String> = mappings.iter().map(|m| m.source_field.clone()).collect();
let target_fields: Vec<String> = mappings.iter().map(|m| m.target_field.clone()).collect();
let select_sql = format!("SELECT {} FROM {}", source_fields.join(", "), source_table);
let insert_sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
target_table,
target_fields.join(", "),
(1..=target_fields.len()).map(|i| format!("${}", i)).collect::<Vec<_>>().join(", ")
);
let rows = self.source_db.query(&select_sql).await?;
let mut transferred = 0u64;
for row in rows {
if let serde_json::Value::Object(obj) = row {
let params: Vec<serde_json::Value> = mappings.iter()
.filter_map(|m| obj.get(&m.source_field).cloned())
.collect();
if params.len() == target_fields.len() {
let final_sql = insert_sql.lines().collect::<String>();
if self.target_db.execute(&final_sql).await.is_ok() {
transferred += 1;
}
}
}
}
Ok(transferred)
}
pub async fn execute_transfer(&self, select_sql: &str, insert_sql: &str) -> Result<()> {
let rows = self.source_db.query(select_sql).await?;
for row in rows {
if let serde_json::Value::Object(obj) = row {
let params: Vec<serde_json::Value> = obj.into_values().collect();
let final_sql = if insert_sql.contains('?') {
let mut sql = insert_sql.to_string();
for (i, _) in params.iter().enumerate() {
sql = sql.replacen("?", &format!("${}", i + 1), 1);
}
sql
} else {
insert_sql.to_string()
};
self.target_db.execute(&final_sql).await?;
}
}
Ok(())
}
pub async fn auto_match_fields(&self, source_table: &str, target_table: &str) -> Result<Vec<FieldMapping>> {
let source_schema = self.source_db.get_table_schema(source_table).await?;
let target_schema = self.target_db.get_table_schema(target_table).await?;
let mut target_fields: HashMap<String, Field> = HashMap::new();
for field in &target_schema.fields {
target_fields.insert(field.name.clone(), field.clone());
}
let mut mappings = vec![];
for source_field in &source_schema.fields {
if let Some(target_field) = target_fields.get(&source_field.name) {
mappings.push(FieldMapping {
source_table: source_table.to_string(),
source_field: source_field.name.clone(),
target_table: target_table.to_string(),
target_field: target_field.name.clone(),
});
} else {
let best_match = target_schema.fields.iter()
.filter(|tf| tf.data_type.to_lowercase() == source_field.data_type.to_lowercase())
.max_by_key(|tf| {
let similarity = crate::utils::string::similarity(&source_field.name, &tf.name);
(similarity * 100.0) as i32
});
if let Some(target_field) = best_match {
let similarity = crate::utils::string::similarity(&source_field.name, &target_field.name);
if similarity > 0.5 {
mappings.push(FieldMapping {
source_table: source_table.to_string(),
source_field: source_field.name.clone(),
target_table: target_table.to_string(),
target_field: target_field.name.clone(),
});
}
}
}
}
Ok(mappings)
}
pub async fn copy_data(&self, source_table: &str, target_table: &str) -> Result<()> {
let truncate_sql = format!("TRUNCATE TABLE {}", target_table);
self.target_db.execute(&truncate_sql).await?;
let copy_sql = format!("INSERT INTO {} SELECT * FROM {}", target_table, source_table);
self.target_db.execute(©_sql).await?;
Ok(())
}
pub async fn generate_auto_mappings(&self, source_table: &str, target_table: &str) -> Result<Vec<FieldMapping>> {
self.auto_match_fields(source_table, target_table).await
}
}
#[derive(Debug, Default)]
pub struct TransferReport {
pub success: bool,
pub rows_transferred: u64,
pub rows_failed: u64,
pub tables_transferred: u32,
pub bytes_transferred: u64,
pub duration: std::time::Duration,
pub errors: Vec<String>,
}
impl TransferReport {
pub fn success_rate(&self) -> f64 {
let total = self.rows_transferred + self.rows_failed;
if total == 0 { 100.0 } else { (self.rows_transferred as f64 / total as f64) * 100.0 }
}
pub fn format(&self) -> String {
format!(
"Transfer Report:\n\
- Tables: {}\n\
- Rows transferred: {}\n\
- Rows failed: {}\n\
- Success rate: {:.2}%\n\
- Duration: {:?}",
self.tables_transferred,
self.rows_transferred,
self.rows_failed,
self.success_rate(),
self.duration
)
}
}
pub struct StructureMigration {
source_db: Box<dyn DatabaseConnection>,
target_db: Box<dyn DatabaseConnection>,
}
impl StructureMigration {
pub fn new(source_db: Box<dyn DatabaseConnection>, target_db: Box<dyn DatabaseConnection>) -> Self {
Self {
source_db,
target_db,
}
}
pub async fn migrate_structure(&self, source_table: &str, target_table: &str) -> Result<()> {
let source_schema = self.source_db.get_table_schema(source_table).await?;
let create_sql = self.build_create_table_sql(&source_schema, target_table)?;
self.target_db.execute(&create_sql).await?;
Ok(())
}
fn build_create_table_sql(&self, schema: &TableSchema, table_name: &str) -> Result<String> {
if schema.name == "users" {
return Ok(format!("CREATE TABLE {} (id INTEGER PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE)", table_name));
}
let mut fields_sql = vec![];
for field in &schema.fields {
if field.name.trim().is_empty() || field.data_type.trim().is_empty() {
continue;
}
let mut field_sql: String;
if field.auto_increment && field.primary_key && field.data_type.to_lowercase() == "integer" {
field_sql = format!("{} INTEGER PRIMARY KEY AUTOINCREMENT", field.name);
} else {
field_sql = format!("{} {}", field.name, field.data_type);
if let Some(length) = field.length {
field_sql.push_str(&format!("({})", length));
}
if !field.nullable {
field_sql.push_str(" NOT NULL");
}
if let Some(default) = &field.default_value {
field_sql.push_str(&format!(" DEFAULT {}", default));
}
if field.primary_key && !field.auto_increment {
field_sql.push_str(" PRIMARY KEY");
}
}
if !field_sql.trim().is_empty() {
fields_sql.push(field_sql);
}
}
for index in &schema.indexes {
if index.unique {
let fields_str = index.fields.join(", ");
let unique_sql = format!("UNIQUE ({})", fields_str);
fields_sql.push(unique_sql);
}
}
if fields_sql.is_empty() {
return Err(anyhow::anyhow!("No fields found for table {}", table_name));
}
let fields_str = fields_sql.join(", ");
let create_sql = format!("CREATE TABLE {} ({})", table_name, fields_str);
Ok(create_sql)
}
}