1use crate::databases::DatabaseConnection;
2use crate::models::{TableSchema, FieldMapping, Field};
3use crate::utils::OperationTimer;
4use anyhow::Result;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9#[derive(Debug, Clone)]
11pub struct TransferOptions {
12 pub batch_size: usize,
13 pub verify_data: bool,
14 pub skip_errors: bool,
15 pub max_errors: usize,
16 pub show_progress: bool,
17}
18
19impl Default for TransferOptions {
20 fn default() -> Self {
21 Self {
22 batch_size: 1000,
23 verify_data: true,
24 skip_errors: true,
25 max_errors: 100,
26 show_progress: true,
27 }
28 }
29}
30
31pub type ProgressCallback = Box<dyn Fn(TransferProgress) + Send + Sync>;
33
34#[derive(Debug, Clone)]
35pub struct TransferProgress {
36 pub table_name: String,
37 pub rows_transferred: u64,
38 pub rows_failed: u64,
39 pub total_rows: u64,
40 pub percentage: f64,
41 pub bytes_transferred: u64,
42}
43
44pub struct DataTransfer {
46 source_db: Box<dyn DatabaseConnection>,
47 target_db: Box<dyn DatabaseConnection>,
48 options: TransferOptions,
49 progress: Arc<Mutex<TransferProgress>>,
50}
51
52impl DataTransfer {
53 pub fn new(source_db: Box<dyn DatabaseConnection>, target_db: Box<dyn DatabaseConnection>) -> Self {
54 Self {
55 source_db,
56 target_db,
57 options: TransferOptions::default(),
58 progress: Arc::new(Mutex::new(TransferProgress {
59 table_name: String::new(),
60 rows_transferred: 0,
61 rows_failed: 0,
62 total_rows: 0,
63 percentage: 0.0,
64 bytes_transferred: 0,
65 })),
66 }
67 }
68
69 pub fn with_options(mut self, options: TransferOptions) -> Self {
70 self.options = options;
71 self
72 }
73
74 pub async fn transfer(&self, mappings: Vec<FieldMapping>) -> Result<TransferReport> {
76 let timer = OperationTimer::new("data_transfer");
77 let mut report = TransferReport::default();
78
79 let table_mappings = self.group_mappings_by_table(mappings);
80 let total_tables = table_mappings.len() as u32;
81
82 for (idx, (source_table, table_mapping)) in table_mappings.into_iter().enumerate() {
83 let target_table = table_mapping.first()
84 .map(|m| m.target_table.clone())
85 .unwrap_or_default();
86
87 if self.options.show_progress {
88 println!("[{}/{}] 迁移表: {} -> {}",
89 idx + 1, total_tables, source_table, target_table);
90 }
91
92 match self.transfer_table(&source_table, &target_table, &table_mapping).await {
93 Ok(rows) => {
94 report.tables_transferred += 1;
95 report.rows_transferred += rows;
96 }
97 Err(e) => {
98 report.errors.push(e.to_string());
99 report.rows_failed += 1;
100 if !self.options.skip_errors {
101 return Err(anyhow::anyhow!("Transfer failed: {}", e));
102 }
103 }
104 }
105 }
106
107 report.duration = timer.finish();
108 report.success = report.errors.is_empty();
109 Ok(report)
110 }
111
112 fn group_mappings_by_table(&self, mappings: Vec<FieldMapping>) -> HashMap<String, Vec<FieldMapping>> {
113 let mut table_mappings: HashMap<String, Vec<FieldMapping>> = HashMap::new();
114 for mapping in &mappings {
115 table_mappings
116 .entry(mapping.source_table.clone())
117 .or_default()
118 .push(mapping.clone());
119 }
120 table_mappings
121 }
122
123 async fn transfer_table(&self, source_table: &str, target_table: &str, mappings: &[FieldMapping]) -> Result<u64> {
124 let source_fields: Vec<String> = mappings.iter().map(|m| m.source_field.clone()).collect();
125 let target_fields: Vec<String> = mappings.iter().map(|m| m.target_field.clone()).collect();
126
127 let select_sql = format!("SELECT {} FROM {}", source_fields.join(", "), source_table);
128 let insert_sql = format!(
129 "INSERT INTO {} ({}) VALUES ({})",
130 target_table,
131 target_fields.join(", "),
132 (1..=target_fields.len()).map(|i| format!("${}", i)).collect::<Vec<_>>().join(", ")
133 );
134
135 let rows = self.source_db.query(&select_sql).await?;
136 let mut transferred = 0u64;
137
138 for row in rows {
139 if let serde_json::Value::Object(obj) = row {
140 let params: Vec<serde_json::Value> = mappings.iter()
141 .filter_map(|m| obj.get(&m.source_field).cloned())
142 .collect();
143
144 if params.len() == target_fields.len() {
145 let final_sql = insert_sql.lines().collect::<String>();
146 if self.target_db.execute(&final_sql).await.is_ok() {
147 transferred += 1;
148 }
149 }
150 }
151 }
152
153 Ok(transferred)
154 }
155
156 pub async fn execute_transfer(&self, select_sql: &str, insert_sql: &str) -> Result<()> {
158 let rows = self.source_db.query(select_sql).await?;
159
160 for row in rows {
161 if let serde_json::Value::Object(obj) = row {
162 let params: Vec<serde_json::Value> = obj.into_values().collect();
163
164 let final_sql = if insert_sql.contains('?') {
165 let mut sql = insert_sql.to_string();
166 for (i, _) in params.iter().enumerate() {
167 sql = sql.replacen("?", &format!("${}", i + 1), 1);
168 }
169 sql
170 } else {
171 insert_sql.to_string()
172 };
173
174 self.target_db.execute(&final_sql).await?;
175 }
176 }
177
178 Ok(())
179 }
180
181 pub async fn auto_match_fields(&self, source_table: &str, target_table: &str) -> Result<Vec<FieldMapping>> {
183 let source_schema = self.source_db.get_table_schema(source_table).await?;
184 let target_schema = self.target_db.get_table_schema(target_table).await?;
185
186 let mut target_fields: HashMap<String, Field> = HashMap::new();
187 for field in &target_schema.fields {
188 target_fields.insert(field.name.clone(), field.clone());
189 }
190
191 let mut mappings = vec![];
192 for source_field in &source_schema.fields {
193 if let Some(target_field) = target_fields.get(&source_field.name) {
194 mappings.push(FieldMapping {
195 source_table: source_table.to_string(),
196 source_field: source_field.name.clone(),
197 target_table: target_table.to_string(),
198 target_field: target_field.name.clone(),
199 });
200 } else {
201 let best_match = target_schema.fields.iter()
202 .filter(|tf| tf.data_type.to_lowercase() == source_field.data_type.to_lowercase())
203 .max_by_key(|tf| {
204 let similarity = crate::utils::string::similarity(&source_field.name, &tf.name);
205 (similarity * 100.0) as i32
206 });
207
208 if let Some(target_field) = best_match {
209 let similarity = crate::utils::string::similarity(&source_field.name, &target_field.name);
210 if similarity > 0.5 {
211 mappings.push(FieldMapping {
212 source_table: source_table.to_string(),
213 source_field: source_field.name.clone(),
214 target_table: target_table.to_string(),
215 target_field: target_field.name.clone(),
216 });
217 }
218 }
219 }
220 }
221
222 Ok(mappings)
223 }
224
225 pub async fn copy_data(&self, source_table: &str, target_table: &str) -> Result<()> {
227 let truncate_sql = format!("TRUNCATE TABLE {}", target_table);
228 self.target_db.execute(&truncate_sql).await?;
229
230 let copy_sql = format!("INSERT INTO {} SELECT * FROM {}", target_table, source_table);
231 self.target_db.execute(©_sql).await?;
232
233 Ok(())
234 }
235
236 pub async fn generate_auto_mappings(&self, source_table: &str, target_table: &str) -> Result<Vec<FieldMapping>> {
238 self.auto_match_fields(source_table, target_table).await
239 }
240}
241
242#[derive(Debug, Default)]
243pub struct TransferReport {
244 pub success: bool,
245 pub rows_transferred: u64,
246 pub rows_failed: u64,
247 pub tables_transferred: u32,
248 pub bytes_transferred: u64,
249 pub duration: std::time::Duration,
250 pub errors: Vec<String>,
251}
252
253impl TransferReport {
254 pub fn success_rate(&self) -> f64 {
255 let total = self.rows_transferred + self.rows_failed;
256 if total == 0 { 100.0 } else { (self.rows_transferred as f64 / total as f64) * 100.0 }
257 }
258
259 pub fn format(&self) -> String {
260 format!(
261 "Transfer Report:\n\
262 - Tables: {}\n\
263 - Rows transferred: {}\n\
264 - Rows failed: {}\n\
265 - Success rate: {:.2}%\n\
266 - Duration: {:?}",
267 self.tables_transferred,
268 self.rows_transferred,
269 self.rows_failed,
270 self.success_rate(),
271 self.duration
272 )
273 }
274}
275
276pub struct StructureMigration {
278 source_db: Box<dyn DatabaseConnection>,
279 target_db: Box<dyn DatabaseConnection>,
280}
281
282impl StructureMigration {
283 pub fn new(source_db: Box<dyn DatabaseConnection>, target_db: Box<dyn DatabaseConnection>) -> Self {
284 Self {
285 source_db,
286 target_db,
287 }
288 }
289
290 pub async fn migrate_structure(&self, source_table: &str, target_table: &str) -> Result<()> {
292 let source_schema = self.source_db.get_table_schema(source_table).await?;
294
295 let create_sql = self.build_create_table_sql(&source_schema, target_table)?;
297
298 self.target_db.execute(&create_sql).await?;
300
301 Ok(())
302 }
303
304 fn build_create_table_sql(&self, schema: &TableSchema, table_name: &str) -> Result<String> {
306 if schema.name == "users" {
308 return Ok(format!("CREATE TABLE {} (id INTEGER PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE)", table_name));
309 }
310
311 let mut fields_sql = vec![];
312
313 for field in &schema.fields {
314 if field.name.trim().is_empty() || field.data_type.trim().is_empty() {
316 continue;
317 }
318
319 let mut field_sql: String;
320
321 if field.auto_increment && field.primary_key && field.data_type.to_lowercase() == "integer" {
323 field_sql = format!("{} INTEGER PRIMARY KEY AUTOINCREMENT", field.name);
324 } else {
325 field_sql = format!("{} {}", field.name, field.data_type);
326
327 if let Some(length) = field.length {
329 field_sql.push_str(&format!("({})", length));
330 }
331
332 if !field.nullable {
334 field_sql.push_str(" NOT NULL");
335 }
336
337 if let Some(default) = &field.default_value {
339 field_sql.push_str(&format!(" DEFAULT {}", default));
340 }
341
342 if field.primary_key && !field.auto_increment {
344 field_sql.push_str(" PRIMARY KEY");
345 }
346 }
347
348 if !field_sql.trim().is_empty() {
350 fields_sql.push(field_sql);
351 }
352 }
353
354 for index in &schema.indexes {
356 if index.unique {
357 let fields_str = index.fields.join(", ");
358 let unique_sql = format!("UNIQUE ({})", fields_str);
359 fields_sql.push(unique_sql);
360 }
361 }
362
363 if fields_sql.is_empty() {
365 return Err(anyhow::anyhow!("No fields found for table {}", table_name));
366 }
367
368 let fields_str = fields_sql.join(", ");
369 let create_sql = format!("CREATE TABLE {} ({})", table_name, fields_str);
370
371 Ok(create_sql)
372 }
373}