1#[cfg(feature = "database")]
4use sqlx::{MySql, Pool, Row};
5use std::collections::HashMap;
6use librarys::random_letters;
7use serde_json::Value;
8use crate::error::{DataForgeError, Result};
9use crate::generators::{number, datetime, internet};
10use crate::db::schema::{TableSchema, FieldSchema, DataType, FieldGeneratorType};
11use super::{DatabaseFiller, FillingConfig, FillingStats, ConnectionConfig};
12
13#[cfg(feature = "database")]
15pub struct MySqlFiller {
16 pool: Pool<MySql>,
17 config: FillingConfig,
18 stats: FillingStats,
19}
20
21#[cfg(feature = "database")]
22impl MySqlFiller {
23 pub async fn new(connection_string: &str, config: FillingConfig) -> Result<Self> {
25 let pool = sqlx::mysql::MySqlPoolOptions::new()
26 .max_connections(10)
27 .connect(connection_string)
28 .await
29 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
30
31 Ok(Self {
32 pool,
33 config,
34 stats: FillingStats::default(),
35 })
36 }
37
38 pub async fn from_config(config: ConnectionConfig, filling_config: FillingConfig) -> Result<Self> {
40 let connection_string = config.to_connection_string()?;
41 Self::new(&connection_string, filling_config).await
42 }
43
44 pub async fn table_exists(&self, table_name: &str) -> Result<bool> {
46 let query = "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = ?";
47 let row = sqlx::query(query)
48 .bind(table_name)
49 .fetch_one(&self.pool)
50 .await
51 .map_err(|e| DataForgeError::database(&format!("Failed to check table existence: {}", e)))?;
52
53 let count: i64 = row.try_get("count")
54 .map_err(|e| DataForgeError::database(&format!("Failed to get count: {}", e)))?;
55
56 Ok(count > 0)
57 }
58
59 pub async fn get_table_schema(&self, table_name: &str) -> Result<TableSchema> {
61 let query = r#"
62 SELECT
63 COLUMN_NAME as column_name,
64 DATA_TYPE as data_type,
65 IS_NULLABLE as is_nullable,
66 COLUMN_DEFAULT as column_default,
67 CHARACTER_MAXIMUM_LENGTH as max_length,
68 NUMERIC_PRECISION as numeric_precision,
69 NUMERIC_SCALE as numeric_scale
70 FROM information_schema.COLUMNS
71 WHERE table_schema = DATABASE() AND table_name = ?
72 ORDER BY ORDINAL_POSITION
73 "#;
74
75 let rows = sqlx::query(query)
76 .bind(table_name)
77 .fetch_all(&self.pool)
78 .await
79 .map_err(|e| DataForgeError::database(&format!("Failed to get table schema: {}", e)))?;
80
81 let mut fields = Vec::new();
82 for row in rows {
83 let column_name: String = row.try_get("column_name")?;
84 let data_type_str: String = row.try_get("data_type")?;
85 let is_nullable: String = row.try_get("is_nullable")?;
86 let max_length: Option<u32> = row.try_get("max_length").ok();
87
88 let data_type = self.mysql_type_to_datatype(&data_type_str, max_length);
89 let nullable = is_nullable.to_uppercase() == "YES";
90
91 fields.push(FieldSchema {
92 name: column_name,
93 data_type,
94 constraints: crate::db::schema::FieldConstraints {
95 nullable,
96 unique: false,
97 default: None,
98 pattern: None,
99 min: None,
100 max: None,
101 },
102 description: None,
103 generator_config: None,
104 generator_type: None,
105 });
106 }
107
108 Ok(TableSchema {
109 name: table_name.to_string(),
110 fields,
111 primary_key: None,
112 indexes: vec![],
113 description: None,
114 })
115 }
116
117 pub async fn fill_table(&mut self, table_name: &str, count: usize) -> Result<usize> {
119 if !self.table_exists(table_name).await? {
121 return Err(DataForgeError::database(&format!("Table '{}' does not exist", table_name)));
122 }
123
124 let schema = self.get_table_schema(table_name).await?;
126
127 let data = self.generate_table_data(count, &schema)?;
129
130 self.batch_insert(table_name, data).await
132 }
133
134 fn generate_table_data(&self, count: usize, schema: &TableSchema) -> Result<Vec<HashMap<String, Value>>> {
136 let mut data = Vec::with_capacity(count);
137
138 for _i in 1..=count {
140 let mut row = HashMap::new();
141
142 for field in &schema.fields {
143 let value = self.generate_value_for_field(field)?;
144 row.insert(field.name.clone(), value);
145 }
146
147 data.push(row);
148 }
149
150 Ok(data)
151 }
152
153 fn mysql_type_to_datatype(&self, mysql_type: &str, max_length: Option<u32>) -> DataType {
155 match mysql_type.to_lowercase().as_str() {
156 "varchar" | "char" | "text" | "tinytext" | "mediumtext" | "longtext" => {
157 DataType::String { max_length: max_length.map(|l| l as usize) }
158 },
159 "int" | "tinyint" | "smallint" | "mediumint" | "bigint" => {
160 DataType::Integer { min: None, max: None }
161 },
162 "float" | "double" | "decimal" => {
163 DataType::Float { min: None, max: None, precision: None }
164 },
165 "boolean" | "bool" => DataType::Boolean,
166 "date" => DataType::Date { format: None },
167 "datetime" | "timestamp" => DataType::DateTime { format: None },
168 "time" => DataType::Time { format: None },
169 "json" => DataType::Json,
170 _ => DataType::String { max_length: Some(255) },
171 }
172 }
173
174 #[allow(dead_code)]
176 fn generate_create_table_sql(&self, schema: &TableSchema) -> Result<String> {
177 use crate::db::schema::DataType;
178
179 let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
180
181 let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
182 let data_type = match &field.data_type {
183 DataType::String { max_length } => {
184 format!("VARCHAR({})", max_length.unwrap_or(255))
185 },
186 DataType::Integer { .. } => "INT".to_string(),
187 DataType::Float { .. } => "FLOAT".to_string(),
188 DataType::Boolean => "BOOLEAN".to_string(),
189 DataType::DateTime { .. } => "DATETIME".to_string(),
190 DataType::Date { .. } => "DATE".to_string(),
191 DataType::Email => "VARCHAR(255)".to_string(),
192 DataType::Phone { .. } => "VARCHAR(20)".to_string(),
193 _ => "TEXT".to_string(),
194 };
195
196 let mut def = format!("{} {}", field.name, data_type);
197
198 if !field.constraints.nullable {
199 def.push_str(" NOT NULL");
200 }
201
202 if field.constraints.unique {
203 def.push_str(" UNIQUE");
204 }
205
206 def
207 }).collect();
208
209 sql.push_str(&field_definitions.join(", "));
210
211 if let Some(ref pk) = schema.primary_key {
212 sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
213 }
214
215 sql.push_str(")");
216 Ok(sql)
217 }
218
219 fn generate_value_for_field(&self, field: &FieldSchema) -> Result<Value> {
221 if let Some(generator_type) = &field.generator_type {
223 return self.generate_value_by_generator_type(field, generator_type);
224 }
225
226 match &field.data_type {
228 DataType::String { max_length } => {
229 let max_len = max_length.unwrap_or(255) as usize;
230 let len = std::cmp::min(10, max_len);
231 Ok(Value::String(random_letters(len)))
232 },
233 DataType::Integer { min, max } => {
234 let min_val = min.unwrap_or(0);
235 let max_val = max.unwrap_or(1000000);
236 Ok(Value::Number(serde_json::Number::from(number::random_int(min_val as i32, max_val as i32))))
237 },
238 DataType::Float { .. } => {
239 Ok(Value::Number(serde_json::Number::from_f64(number::random_float(0.0, 100.0)).unwrap()))
240 },
241 DataType::Boolean => {
242 Ok(Value::Bool(rand::random()))
243 },
244 DataType::DateTime { .. } => {
245 Ok(Value::String(datetime::iso8601()))
246 },
247 DataType::Date { .. } => {
248 Ok(Value::String(datetime::date_iso()))
249 },
250 DataType::Email => {
251 Ok(Value::String(internet::email()))
252 },
253 DataType::Phone { .. } => {
254 Ok(Value::String(number::phone_number_cn()))
255 },
256 _ => {
257 Ok(Value::String("default_value".to_string()))
258 }
259 }
260 }
261
262 fn generate_value_by_generator_type(&self, field: &FieldSchema, generator_type: &FieldGeneratorType) -> Result<Value> {
264 match generator_type {
265 FieldGeneratorType::Default => {
266 self.generate_value_for_field(field)
268 },
269 FieldGeneratorType::Custom(generator_name) => {
270 Ok(Value::String(format!("custom_{}", generator_name)))
272 },
273 FieldGeneratorType::RandomString => {
274 let len = match &field.data_type {
275 DataType::String { max_length } => max_length.unwrap_or(10),
276 _ => 10,
277 };
278 Ok(Value::String(random_letters(len as usize)))
279 },
280 FieldGeneratorType::RandomInteger => {
281 Ok(Value::Number(serde_json::Number::from(crate::generators::random_int(0, 1000000))))
282 },
283 FieldGeneratorType::RandomFloat => {
284 Ok(Value::Number(serde_json::Number::from_f64(crate::generators::random_float(0.0, 1000.0)).unwrap()))
285 },
286 FieldGeneratorType::RandomBoolean => {
287 Ok(Value::Bool(crate::generators::random_bool()))
288 },
289 FieldGeneratorType::CurrentTimestamp => {
290 Ok(Value::String(datetime::iso8601()))
291 },
292 FieldGeneratorType::RandomDate => {
293 Ok(Value::String(datetime::date_iso()))
294 },
295 FieldGeneratorType::RandomDateTime => {
296 Ok(Value::String(datetime::iso8601()))
297 },
298 FieldGeneratorType::RandomEmail => {
299 Ok(Value::String(crate::generators::internet::email()))
300 },
301 FieldGeneratorType::RandomPhone => {
302 Ok(Value::String(crate::generators::number::phone_number_cn()))
303 },
304 FieldGeneratorType::RandomUrl => {
305 Ok(Value::String(crate::generators::internet::url()))
306 },
307 FieldGeneratorType::Uuid => {
308 Ok(Value::String(crate::generators::uuid_v4()))
309 },
310 FieldGeneratorType::Name => {
311 match crate::generation::DataForge::default().language() {
313 crate::generation::Language::ZhCN => {
314 Ok(Value::String(crate::generators::name::zh_cn_fullname()))
315 },
316 crate::generation::Language::EnUS => {
317 Ok(Value::String(crate::generators::name::en_us_fullname()))
318 },
319 crate::generation::Language::JaJP => {
320 Ok(Value::String(crate::generators::name::ja_jp_fullname()))
321 },
322 }
323 },
324 FieldGeneratorType::CompanyName => {
325 Ok(Value::String(format!("Company {}", crate::generators::random_int(1, 1000))))
326 },
327 FieldGeneratorType::Address => {
328 match crate::generation::DataForge::default().language() {
330 crate::generation::Language::ZhCN => {
331 Ok(Value::String(format!("{}市{}区{}街道{}",
332 crate::generators::address::zh_city(),
333 crate::generators::address::zh_city(),
334 crate::generators::random_int(1, 100),
335 crate::generators::random_int(1, 1000))))
336 },
337 crate::generation::Language::EnUS => {
338 Ok(Value::String(format!("{} Street {}, {}, {}",
339 crate::generators::address::us_city(),
340 crate::generators::random_int(1, 1000),
341 crate::generators::address::us_city(),
342 crate::generators::address::us_state())))
343 },
344 crate::generation::Language::JaJP => {
345 Ok(Value::String(format!("{}{}{}丁目{}",
346 crate::generators::address::us_city(), crate::generators::address::us_city(), crate::generators::random_int(1, 20),
349 crate::generators::random_int(1, 20))))
350 },
351 }
352 },
353 FieldGeneratorType::ProductName => {
354 let products = ["Laptop", "Smartphone", "Tablet", "Watch", "Headphones", "Speaker"];
355 let product = products[crate::generators::random_int(0, products.len() as i32 - 1) as usize];
356 Ok(Value::String(format!("{} {}", product, crate::generators::random_int(1, 1000))))
357 },
358 FieldGeneratorType::OrderStatus => {
359 let statuses = ["Pending", "Processing", "Shipped", "Delivered", "Cancelled"];
360 let status = statuses[crate::generators::random_int(0, statuses.len() as i32 - 1) as usize];
361 Ok(Value::String(status.to_string()))
362 },
363 }
364 }
365
366 pub async fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<usize> {
368 if data.is_empty() {
369 return Ok(0);
370 }
371
372 let start_time = std::time::Instant::now();
373 let mut total_inserted = 0;
374
375 for chunk in data.chunks(self.config.batch_size) {
377 let inserted = self.insert_chunk(table_name, chunk).await?;
378 total_inserted += inserted;
379 }
380
381 self.stats.successful_rows += total_inserted;
383 self.stats.processing_time_ms += start_time.elapsed().as_millis() as u64;
384 self.stats.calculate_speed();
385
386 Ok(total_inserted)
387 }
388
389 async fn insert_chunk(&self, table_name: &str, chunk: &[HashMap<String, Value>]) -> Result<usize> {
391 if chunk.is_empty() {
392 return Ok(0);
393 }
394
395 let fields: Vec<String> = chunk[0].keys().cloned().collect();
397 let field_list = fields.join(", ");
398 let placeholders = fields.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
399
400 let sql = format!("INSERT INTO {} ({}) VALUES ({})", table_name, field_list, placeholders);
401
402 let mut transaction = self.pool.begin().await
403 .map_err(|e| DataForgeError::database(&format!("Failed to start transaction: {}", e)))?;
404
405 for row in chunk {
406 let mut query = sqlx::query(&sql);
407
408 for field in &fields {
409 let value = row.get(field).unwrap_or(&Value::Null);
410 query = match value {
411 Value::String(s) => query.bind(s),
412 Value::Number(n) => {
413 if let Some(i) = n.as_i64() {
414 query.bind(i)
415 } else if let Some(f) = n.as_f64() {
416 query.bind(f)
417 } else {
418 query.bind(n.to_string())
419 }
420 },
421 Value::Bool(b) => query.bind(*b),
422 Value::Null => query.bind(Option::<String>::None),
423 _ => query.bind(value.to_string()),
424 };
425 }
426
427 query.execute(&mut *transaction).await
428 .map_err(|e| DataForgeError::database(&format!("Failed to execute insert: {}", e)))?;
429 }
430
431 transaction.commit().await
432 .map_err(|e| DataForgeError::database(&format!("Failed to commit transaction: {}", e)))?;
433
434 Ok(chunk.len())
435 }
436}
437
438pub struct MySqlFillerSync {
440 pub connection_string: String,
441 pub config: FillingConfig,
442 pub stats: FillingStats,
443}
444
445impl MySqlFillerSync {
446 pub fn new(connection_string: String, config: FillingConfig) -> Self {
448 Self {
449 connection_string,
450 config,
451 stats: FillingStats::default(),
452 }
453 }
454
455 pub fn from_config(config: ConnectionConfig, filling_config: FillingConfig) -> Result<Self> {
457 let connection_string = config.to_connection_string()?;
458 Ok(Self::new(connection_string, filling_config))
459 }
460
461 fn generate_create_table_sql(&self, schema: &TableSchema) -> Result<String> {
463 use crate::db::schema::DataType;
464
465 let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
466
467 let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
468 let data_type = match &field.data_type {
469 DataType::String { max_length } => {
470 format!("VARCHAR({})", max_length.unwrap_or(255))
471 },
472 DataType::Integer { .. } => "INT".to_string(),
473 DataType::Float { .. } => "FLOAT".to_string(),
474 DataType::Boolean => "BOOLEAN".to_string(),
475 DataType::DateTime { .. } => "DATETIME".to_string(),
476 DataType::Date { .. } => "DATE".to_string(),
477 DataType::Email => "VARCHAR(255)".to_string(),
478 DataType::Phone { .. } => "VARCHAR(20)".to_string(),
479 _ => "TEXT".to_string(),
480 };
481
482 let mut def = format!("{} {}", field.name, data_type);
483
484 if !field.constraints.nullable {
485 def.push_str(" NOT NULL");
486 }
487
488 if field.constraints.unique {
489 def.push_str(" UNIQUE");
490 }
491
492 def
493 }).collect();
494
495 sql.push_str(&field_definitions.join(", "));
496
497 if let Some(ref pk) = schema.primary_key {
498 sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
499 }
500
501 sql.push_str(")");
502 Ok(sql)
503 }
504
505 pub fn table_exists(&self, table_name: &str) -> Result<bool> {
507 let rt = tokio::runtime::Runtime::new()
509 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
510
511 rt.block_on(async {
512 let pool = sqlx::mysql::MySqlPoolOptions::new()
513 .max_connections(1)
514 .connect(&self.connection_string)
515 .await
516 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
517
518 let query = "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = ?";
519 let row = sqlx::query(query)
520 .bind(table_name)
521 .fetch_one(&pool)
522 .await
523 .map_err(|e| DataForgeError::database(&format!("Failed to check table existence: {}", e)))?;
524
525 let count: i64 = row.try_get("count")
526 .map_err(|e| DataForgeError::database(&format!("Failed to get count: {}", e)))?;
527
528 pool.close().await;
529 Ok(count > 0)
530 })
531 }
532
533 pub fn get_table_schema(&self, table_name: &str) -> Result<TableSchema> {
535 let rt = tokio::runtime::Runtime::new()
537 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
538
539 rt.block_on(async {
540 let pool = sqlx::mysql::MySqlPoolOptions::new()
541 .max_connections(1)
542 .connect(&self.connection_string)
543 .await
544 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
545
546 let query = r#"
547 SELECT
548 COLUMN_NAME as column_name,
549 DATA_TYPE as data_type,
550 IS_NULLABLE as is_nullable,
551 COLUMN_DEFAULT as column_default,
552 CHARACTER_MAXIMUM_LENGTH as max_length,
553 NUMERIC_PRECISION as numeric_precision,
554 NUMERIC_SCALE as numeric_scale
555 FROM information_schema.COLUMNS
556 WHERE table_schema = DATABASE() AND table_name = ?
557 ORDER BY ORDINAL_POSITION
558 "#;
559
560 let rows = sqlx::query(query)
561 .bind(table_name)
562 .fetch_all(&pool)
563 .await
564 .map_err(|e| DataForgeError::database(&format!("Failed to get table schema: {}", e)))?;
565
566 let mut fields = Vec::new();
567 for row in rows {
568 let column_name: String = row.try_get("column_name")
569 .map_err(|e| DataForgeError::database(&format!("Failed to get column_name: {}", e)))?;
570 let data_type_str: String = row.try_get("data_type")
571 .map_err(|e| DataForgeError::database(&format!("Failed to get data_type: {}", e)))?;
572 let is_nullable: String = row.try_get("is_nullable")
573 .map_err(|e| DataForgeError::database(&format!("Failed to get is_nullable: {}", e)))?;
574 let max_length: Option<u32> = row.try_get("max_length").ok();
575
576 let data_type = self.mysql_type_to_datatype(&data_type_str, max_length);
577 let nullable = is_nullable.to_uppercase() == "YES";
578
579 fields.push(FieldSchema {
580 name: column_name,
581 data_type,
582 constraints: crate::db::schema::FieldConstraints {
583 nullable,
584 unique: false,
585 default: None,
586 pattern: None,
587 min: None,
588 max: None,
589 },
590 description: None,
591 generator_config: None,
592 generator_type: None,
593 });
594 }
595
596 pool.close().await;
597
598 Ok(TableSchema {
599 name: table_name.to_string(),
600 fields,
601 primary_key: None,
602 indexes: vec![],
603 description: None,
604 })
605 })
606 }
607
608 fn mysql_type_to_datatype(&self, mysql_type: &str, max_length: Option<u32>) -> DataType {
610 match mysql_type.to_lowercase().as_str() {
611 "varchar" | "char" | "text" | "tinytext" | "mediumtext" | "longtext" => {
612 DataType::String { max_length: max_length.map(|l| l as usize) }
613 },
614 "int" | "tinyint" | "smallint" | "mediumint" | "bigint" => {
615 DataType::Integer { min: None, max: None }
616 },
617 "float" | "double" | "decimal" => {
618 DataType::Float { min: None, max: None, precision: None }
619 },
620 "boolean" | "bool" => DataType::Boolean,
621 "date" => DataType::Date { format: None },
622 "datetime" | "timestamp" => DataType::DateTime { format: None },
623 "time" => DataType::Time { format: None },
624 "json" => DataType::Json,
625 _ => DataType::String { max_length: Some(255) },
626 }
627 }
628
629 pub fn fill_table(&mut self, table_name: &str, count: usize) -> Result<usize> {
631 let start_time = std::time::Instant::now();
632
633 if !self.table_exists(table_name)? {
635 return Err(DataForgeError::database(&format!("Table '{}' does not exist", table_name)));
636 }
637
638 let schema = self.get_table_schema(table_name)?;
640
641 let data = self.generate_table_data(count, &schema)?;
643
644 let inserted = self.batch_insert(table_name, data)?;
646
647 self.stats.total_rows = count;
649 self.stats.successful_rows = inserted;
650 self.stats.processing_time_ms = start_time.elapsed().as_millis() as u64;
651 self.stats.calculate_speed();
652
653 #[cfg(debug_assertions)]
654 {
655 println!("成功插入 {} 条记录到表 {}", inserted, table_name);
656 println!("处理时间: {}ms", self.stats.processing_time_ms);
657 }
658
659 Ok(inserted)
660 }
661
662 fn generate_table_data(&self, count: usize, schema: &TableSchema) -> Result<Vec<HashMap<String, Value>>> {
664 let mut data = Vec::with_capacity(count);
665
666 for _i in 1..=count {
668 let mut row = HashMap::new();
669
670 for field in &schema.fields {
671 let value = self.generate_value_for_field(field)?;
672 row.insert(field.name.clone(), value);
673 }
674
675 data.push(row);
676 }
677
678 Ok(data)
679 }
680
681 fn generate_value_for_field(&self, field: &FieldSchema) -> Result<Value> {
683 if let Some(generator_type) = &field.generator_type {
685 return self.generate_value_by_generator_type(field, generator_type);
686 }
687
688 match &field.data_type {
690 DataType::String { max_length } => {
691 let max_len = max_length.unwrap_or(255) as usize;
692 let len = std::cmp::min(10, max_len);
693 Ok(Value::String(random_letters(len)))
694 },
695 DataType::Integer { min, max } => {
696 let min_val = min.unwrap_or(0);
697 let max_val = max.unwrap_or(1000000);
698 Ok(Value::Number(serde_json::Number::from(number::random_int(min_val as i32, max_val as i32))))
699 },
700 DataType::Float { .. } => {
701 Ok(Value::Number(serde_json::Number::from_f64(number::random_float(0.0, 100.0)).unwrap()))
702 },
703 DataType::Boolean => {
704 Ok(Value::Bool(rand::random()))
705 },
706 DataType::DateTime { .. } => {
707 Ok(Value::String(datetime::iso8601()))
708 },
709 DataType::Date { .. } => {
710 Ok(Value::String(datetime::date_iso()))
711 },
712 DataType::Email => {
713 Ok(Value::String(internet::email()))
714 },
715 DataType::Phone { .. } => {
716 Ok(Value::String(number::phone_number_cn()))
717 },
718 _ => {
719 Ok(Value::String("default_value".to_string()))
720 }
721 }
722 }
723
724 fn generate_value_by_generator_type(&self, field: &FieldSchema, generator_type: &FieldGeneratorType) -> Result<Value> {
726 match generator_type {
727 FieldGeneratorType::Default => {
728 self.generate_value_for_field(field)
730 },
731 FieldGeneratorType::Custom(generator_name) => {
732 Ok(Value::String(format!("custom_{}", generator_name)))
734 },
735 FieldGeneratorType::RandomString => {
736 let len = match &field.data_type {
737 DataType::String { max_length } => max_length.unwrap_or(10),
738 _ => 10,
739 };
740 Ok(Value::String(random_letters(len as usize)))
741 },
742 FieldGeneratorType::RandomInteger => {
743 Ok(Value::Number(serde_json::Number::from(crate::generators::random_int(0, 1000000))))
744 },
745 FieldGeneratorType::RandomFloat => {
746 Ok(Value::Number(serde_json::Number::from_f64(crate::generators::random_float(0.0, 1000.0)).unwrap()))
747 },
748 FieldGeneratorType::RandomBoolean => {
749 Ok(Value::Bool(crate::generators::random_bool()))
750 },
751 FieldGeneratorType::CurrentTimestamp => {
752 Ok(Value::String(datetime::iso8601()))
753 },
754 FieldGeneratorType::RandomDate => {
755 Ok(Value::String(datetime::date_iso()))
756 },
757 FieldGeneratorType::RandomDateTime => {
758 Ok(Value::String(datetime::iso8601()))
759 },
760 FieldGeneratorType::RandomEmail => {
761 Ok(Value::String(crate::generators::internet::email()))
762 },
763 FieldGeneratorType::RandomPhone => {
764 Ok(Value::String(crate::generators::number::phone_number_cn()))
765 },
766 FieldGeneratorType::RandomUrl => {
767 Ok(Value::String(crate::generators::internet::url()))
768 },
769 FieldGeneratorType::Uuid => {
770 Ok(Value::String(crate::generators::uuid_v4()))
771 },
772 FieldGeneratorType::Name => {
773 Ok(Value::String(crate::generators::name::zh_cn_fullname()))
774 },
775 FieldGeneratorType::CompanyName => {
776 Ok(Value::String(format!("Company {}", crate::generators::random_int(1, 1000))))
777 },
778 FieldGeneratorType::Address => {
779 Ok(Value::String(format!("{} Street {}",
780 crate::generators::address::zh_city(),
781 crate::generators::random_int(1, 1000))))
782 },
783 FieldGeneratorType::ProductName => {
784 let products = ["Laptop", "Smartphone", "Tablet", "Watch", "Headphones", "Speaker"];
785 let product = products[crate::generators::random_int(0, products.len() as i32 - 1) as usize];
786 Ok(Value::String(format!("{} {}", product, crate::generators::random_int(1, 1000))))
787 },
788 FieldGeneratorType::OrderStatus => {
789 let statuses = ["Pending", "Processing", "Shipped", "Delivered", "Cancelled"];
790 let status = statuses[crate::generators::random_int(0, statuses.len() as i32 - 1) as usize];
791 Ok(Value::String(status.to_string()))
792 },
793 }
794 }
795
796 pub fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<usize> {
798 if data.is_empty() {
799 return Ok(0);
800 }
801
802 let start_time = std::time::Instant::now();
803 let mut total_inserted = 0;
804
805 for chunk in data.chunks(self.config.batch_size) {
807 let inserted = self.insert_chunk(table_name, chunk)?;
808 total_inserted += inserted;
809 }
810
811 self.stats.successful_rows += total_inserted;
813 self.stats.processing_time_ms += start_time.elapsed().as_millis() as u64;
814 self.stats.calculate_speed();
815
816 Ok(total_inserted)
817 }
818
819 fn insert_chunk(&self, table_name: &str, chunk: &[HashMap<String, Value>]) -> Result<usize> {
821 if chunk.is_empty() {
822 return Ok(0);
823 }
824
825 let rt = tokio::runtime::Runtime::new()
827 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
828
829 rt.block_on(async {
830 let pool = sqlx::mysql::MySqlPoolOptions::new()
831 .max_connections(1)
832 .connect(&self.connection_string)
833 .await
834 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
835
836 let fields: Vec<String> = chunk[0].keys().cloned().collect();
838 let field_list = fields.join(", ");
839 let placeholders = fields.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
840
841 let sql = format!("INSERT INTO {} ({}) VALUES ({})", table_name, field_list, placeholders);
842
843 let mut transaction = pool.begin().await
844 .map_err(|e| DataForgeError::database(&format!("Failed to start transaction: {}", e)))?;
845
846 for row in chunk {
847 let mut query = sqlx::query(&sql);
848
849 for field in &fields {
850 let value = row.get(field).unwrap_or(&Value::Null);
851 query = match value {
852 Value::String(s) => query.bind(s),
853 Value::Number(n) => {
854 if let Some(i) = n.as_i64() {
855 query.bind(i)
856 } else if let Some(f) = n.as_f64() {
857 query.bind(f)
858 } else {
859 query.bind(n.to_string())
860 }
861 },
862 Value::Bool(b) => query.bind(*b),
863 Value::Null => query.bind(Option::<String>::None),
864 _ => query.bind(value.to_string()),
865 };
866 }
867
868 query.execute(&mut *transaction).await
869 .map_err(|e| DataForgeError::database(&format!("Failed to execute insert: {}", e)))?;
870 }
871
872 transaction.commit().await
873 .map_err(|e| DataForgeError::database(&format!("Failed to commit transaction: {}", e)))?;
874
875 pool.close().await;
876 Ok(chunk.len())
877 })
878 }
879
880}
881
882impl DatabaseFiller for MySqlFillerSync {
883 fn create_table(&mut self, schema: &TableSchema) -> Result<()> {
884 let rt = tokio::runtime::Runtime::new()
886 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
887
888 rt.block_on(async {
889 let pool = sqlx::mysql::MySqlPoolOptions::new()
890 .max_connections(1)
891 .connect(&self.connection_string)
892 .await
893 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
894
895 let sql = self.generate_create_table_sql(schema)?;
897
898 sqlx::query(&sql)
899 .execute(&pool)
900 .await
901 .map_err(|e| DataForgeError::database(&format!("Failed to create table: {}", e)))?;
902
903 pool.close().await;
904 Ok(())
905 })
906 }
907
908 fn insert_data(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
909 self.batch_insert(table_name, data)?;
911 Ok(())
912 }
913
914 fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
915 let _inserted = MySqlFillerSync::batch_insert(self, table_name, data)?;
917 Ok(())
918 }
919
920 fn get_stats(&self) -> FillingStats {
921 self.stats.clone()
922 }
923
924 fn truncate_table(&mut self, table_name: &str) -> Result<()> {
925 let rt = tokio::runtime::Runtime::new()
927 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
928
929 rt.block_on(async {
930 let pool = sqlx::mysql::MySqlPoolOptions::new()
931 .max_connections(1)
932 .connect(&self.connection_string)
933 .await
934 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
935
936 let sql = format!("TRUNCATE TABLE {}", table_name);
937 sqlx::query(&sql)
938 .execute(&pool)
939 .await
940 .map_err(|e| DataForgeError::database(&format!("Failed to truncate table: {}", e)))?;
941
942 pool.close().await;
943 Ok(())
944 })
945 }
946
947 fn drop_table(&mut self, table_name: &str) -> Result<()> {
948 let rt = tokio::runtime::Runtime::new()
950 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
951
952 rt.block_on(async {
953 let pool = sqlx::mysql::MySqlPoolOptions::new()
954 .max_connections(1)
955 .connect(&self.connection_string)
956 .await
957 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
958
959 let sql = format!("DROP TABLE IF EXISTS {}", table_name);
960 sqlx::query(&sql)
961 .execute(&pool)
962 .await
963 .map_err(|e| DataForgeError::database(&format!("Failed to drop table: {}", e)))?;
964
965 pool.close().await;
966 Ok(())
967 })
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use super::*;
974 use crate::filling::ConnectionConfig;
975 use crate::{forge, name};
976
977 #[test]
978 fn test_mysql_filler_sync_creation() {
979 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
980 .with_port(13307);
981 let filling_config = FillingConfig::default();
982
983 let filler = MySqlFillerSync::from_config(config, filling_config);
984 assert!(filler.is_ok());
985 }
986
987 #[test]
988 fn test_mysql_connection_string() {
989 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
990 .with_port(13307);
991
992 let connection_string = config.to_connection_string().unwrap();
993 assert!(connection_string.contains("mysql://root:Aa123456@localhost:13307/demo"));
994 }
995
996 #[test]
997 fn test_generate_sample_employee() {
998 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
999 .with_port(13307);
1000 let filling_config = FillingConfig::default();
1001 let filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1002
1003 let employee = generate_sample_employee(1);
1004 assert!(employee.is_object());
1005 assert!(!employee.as_object().unwrap().is_empty());
1008 }
1009
1010 #[test]
1011 fn test_generate_sample_employee_with_actual_schema() {
1012 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1013 .with_port(13307);
1014 let filling_config = FillingConfig::default();
1015 let filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1016
1017 let _schema_result = filler.get_table_schema("employees");
1019 let employee = generate_sample_employee(1);
1024 assert!(employee.is_object());
1025 assert!(!employee.as_object().unwrap().is_empty());
1027 }
1028
1029 #[test]
1030 fn test_dynamic_schema_generation() {
1031 let custom_schema = TableSchema {
1033 name: "custom_employees".to_string(),
1034 fields: vec![
1035 FieldSchema {
1036 name: "employee_id".to_string(),
1037 data_type: DataType::Integer { min: None, max: None },
1038 constraints: crate::db::schema::FieldConstraints {
1039 nullable: false,
1040 unique: true,
1041 default: None,
1042 pattern: None,
1043 min: None,
1044 max: None,
1045 },
1046 description: None,
1047 generator_config: None,
1048 generator_type: None,
1049 },
1050 FieldSchema {
1051 name: "full_name".to_string(),
1052 data_type: DataType::String { max_length: Some(100) },
1053 constraints: crate::db::schema::FieldConstraints {
1054 nullable: false,
1055 unique: false,
1056 default: None,
1057 pattern: None,
1058 min: None,
1059 max: None,
1060 },
1061 description: None,
1062 generator_config: None,
1063 generator_type: None,
1064 },
1065 FieldSchema {
1066 name: "birth_date".to_string(),
1067 data_type: DataType::Date { format: None },
1068 constraints: crate::db::schema::FieldConstraints {
1069 nullable: false,
1070 unique: false,
1071 default: None,
1072 pattern: None,
1073 min: None,
1074 max: None,
1075 },
1076 description: None,
1077 generator_config: None,
1078 generator_type: None,
1079 },
1080 FieldSchema {
1081 name: "salary".to_string(),
1082 data_type: DataType::Integer { min: Some(0), max: Some(1000000) },
1083 constraints: crate::db::schema::FieldConstraints {
1084 nullable: false,
1085 unique: false,
1086 default: None,
1087 pattern: None,
1088 min: None,
1089 max: None,
1090 },
1091 description: None,
1092 generator_config: None,
1093 generator_type: None,
1094 },
1095 ],
1096 primary_key: Some(vec!["employee_id".to_string()]),
1097 indexes: vec![],
1098 description: None,
1099 };
1100
1101 let employee_data = crate::filling::utils::generate_sample_data_by_schema(&custom_schema, 1);
1103
1104 assert!(employee_data.is_object());
1106 let employee_obj = employee_data.as_object().unwrap();
1107
1108 assert!(employee_obj.contains_key("employee_id"));
1110 assert!(employee_obj.contains_key("full_name"));
1111 assert!(employee_obj.contains_key("birth_date"));
1112 assert!(employee_obj.contains_key("salary"));
1113
1114 assert!(employee_obj.get("employee_id").unwrap().is_number());
1116 assert!(employee_obj.get("full_name").unwrap().is_string());
1117 assert!(employee_obj.get("birth_date").unwrap().is_string());
1118 assert!(employee_obj.get("salary").unwrap().is_number());
1119 }
1120
1121 #[test]
1122 fn test_fill_employees_table() {
1123 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1124 .with_port(13307);
1125 let filling_config = FillingConfig::default();
1126 let mut filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1127
1128 let result = filler.fill_table("employees", 10);
1129 match &result {
1130 Ok(count) => {
1131 println!("成功插入 {} 条记录", count);
1132 },
1133 Err(e) => {
1134 println!("填充数据失败: {}", e);
1135 return;
1138 }
1139 }
1140
1141 assert!(result.is_ok());
1142 assert_eq!(result.unwrap(), 10);
1143
1144 let stats = filler.get_stats();
1145 assert_eq!(stats.successful_rows, 10);
1146 }
1147
1148 #[test]
1149 fn test_table_schema() {
1150 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1151 .with_port(13307);
1152 let filling_config = FillingConfig::default();
1153 let filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1154
1155 let schema = filler.get_table_schema("employees");
1156 match &schema {
1157 Ok(s) => {
1158 println!("成功获取表结构: {}", s.name);
1159 println!("字段数: {}", s.fields.len());
1160 },
1161 Err(e) => {
1162 println!("获取表结构失败: {}", e);
1163 return;
1166 }
1167 }
1168
1169 assert!(schema.is_ok());
1170
1171 let schema = schema.unwrap();
1172 assert_eq!(schema.name, "employees");
1173 assert!(!schema.fields.is_empty());
1174 }
1175
1176 #[test]
1177 fn test_batch_insert() {
1178 let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1179 .with_port(13307);
1180 let filling_config = FillingConfig::default();
1181 let mut filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1182
1183 let mut test_data = Vec::new();
1184 for i in 1..=5 {
1185 let mut row = HashMap::new();
1186 row.insert("emp_no".to_string(), Value::Number((100000 + i).into()));
1187 row.insert("first_name".to_string(), Value::String(format!("First{}", i)));
1188 row.insert("last_name".to_string(), Value::String(format!("Last{}", i)));
1189 test_data.push(row);
1190 }
1191
1192 let result = filler.batch_insert("employees", test_data);
1193 match &result {
1195 Ok(count) => {
1196 assert_eq!(*count, 5);
1197 },
1198 Err(e) => {
1199 println!("数据库连接失败(在预期中): {}", e);
1201 }
1204 }
1205 }
1206
1207 fn generate_sample_employee(index: usize) -> Value {
1209 let connection_config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1210 .with_port(13307)
1211 .with_param("charset", "utf8mb4");
1212
1213 let filling_config = FillingConfig {
1214 batch_size: 100,
1215 use_transaction: true,
1216 timeout_seconds: 60,
1217 retry_count: 2,
1218 };
1219
1220 let mut filler = MySqlFillerSync::from_config(connection_config, filling_config).unwrap();
1221
1222 match filler.get_table_schema("employees") {
1224 Ok(schema) => {
1225 crate::filling::utils::generate_sample_data_by_schema(&schema, index)
1227 },
1228 Err(_) => {
1229 let generic_schema = TableSchema {
1232 name: "employees".to_string(),
1233 fields: vec![
1234 FieldSchema {
1235 name: "id".to_string(),
1236 data_type: DataType::Integer { min: None, max: None },
1237 constraints: crate::db::schema::FieldConstraints {
1238 nullable: false,
1239 unique: true,
1240 default: None,
1241 pattern: None,
1242 min: None,
1243 max: None,
1244 },
1245 description: None,
1246 generator_config: None,
1247 generator_type: None,
1248 },
1249 ],
1250 primary_key: Some(vec!["id".to_string()]),
1251 indexes: vec![],
1252 description: None,
1253 };
1254
1255 crate::filling::utils::generate_sample_data_by_schema(&generic_schema, index)
1256 }
1257 }
1258 }
1259
1260}