dataforge/filling/
mysql.rs

1//! MySQL 数据库填充实现
2
3#[cfg(feature = "database")]
4use sqlx::{MySql, Pool, Row};
5use std::collections::HashMap;
6use librarys::random_letters;
7use serde_json::Value;
8use crate::error::{DataForgeError, Result};
9use crate::generators::{number, datetime, internet};
10use crate::db::schema::{TableSchema, FieldSchema, DataType, FieldGeneratorType};
11use super::{DatabaseFiller, FillingConfig, FillingStats, ConnectionConfig};
12
13/// MySQL 专用数据填充器
14#[cfg(feature = "database")]
15pub struct MySqlFiller {
16    pool: Pool<MySql>,
17    config: FillingConfig,
18    stats: FillingStats,
19}
20
21#[cfg(feature = "database")]
22impl MySqlFiller {
23    /// 创建新的 MySQL 填充器
24    pub async fn new(connection_string: &str, config: FillingConfig) -> Result<Self> {
25        let pool = sqlx::mysql::MySqlPoolOptions::new()
26            .max_connections(10)
27            .connect(connection_string)
28            .await
29            .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
30
31        Ok(Self {
32            pool,
33            config,
34            stats: FillingStats::default(),
35        })
36    }
37
38    /// 从连接配置创建
39    pub async fn from_config(config: ConnectionConfig, filling_config: FillingConfig) -> Result<Self> {
40        let connection_string = config.to_connection_string()?;
41        Self::new(&connection_string, filling_config).await
42    }
43
44    /// 检查表是否存在
45    pub async fn table_exists(&self, table_name: &str) -> Result<bool> {
46        let query = "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = ?";
47        let row = sqlx::query(query)
48            .bind(table_name)
49            .fetch_one(&self.pool)
50            .await
51            .map_err(|e| DataForgeError::database(&format!("Failed to check table existence: {}", e)))?;
52        
53        let count: i64 = row.try_get("count")
54            .map_err(|e| DataForgeError::database(&format!("Failed to get count: {}", e)))?;
55        
56        Ok(count > 0)
57    }
58
59    /// 获取表结构
60    pub async fn get_table_schema(&self, table_name: &str) -> Result<TableSchema> {
61        let query = r#"
62            SELECT 
63                COLUMN_NAME as column_name,
64                DATA_TYPE as data_type,
65                IS_NULLABLE as is_nullable,
66                COLUMN_DEFAULT as column_default,
67                CHARACTER_MAXIMUM_LENGTH as max_length,
68                NUMERIC_PRECISION as numeric_precision,
69                NUMERIC_SCALE as numeric_scale
70            FROM information_schema.COLUMNS 
71            WHERE table_schema = DATABASE() AND table_name = ?
72            ORDER BY ORDINAL_POSITION
73        "#;
74
75        let rows = sqlx::query(query)
76            .bind(table_name)
77            .fetch_all(&self.pool)
78            .await
79            .map_err(|e| DataForgeError::database(&format!("Failed to get table schema: {}", e)))?;
80
81        let mut fields = Vec::new();
82        for row in rows {
83            let column_name: String = row.try_get("column_name")?;
84            let data_type_str: String = row.try_get("data_type")?;
85            let is_nullable: String = row.try_get("is_nullable")?;
86            let max_length: Option<u32> = row.try_get("max_length").ok();
87
88            let data_type = self.mysql_type_to_datatype(&data_type_str, max_length);
89            let nullable = is_nullable.to_uppercase() == "YES";
90
91            fields.push(FieldSchema {
92                name: column_name,
93                data_type,
94                constraints: crate::db::schema::FieldConstraints {
95                    nullable,
96                    unique: false,
97                    default: None,
98                    pattern: None,
99                    min: None,
100                    max: None,
101                },
102                description: None,
103                generator_config: None,
104                generator_type: None,
105            });
106        }
107
108        Ok(TableSchema {
109            name: table_name.to_string(),
110            fields,
111            primary_key: None,
112            indexes: vec![],
113            description: None,
114        })
115    }
116
117    /// 填充指定表的随机数据
118    pub async fn fill_table(&mut self, table_name: &str, count: usize) -> Result<usize> {
119        // 检查表是否存在
120        if !self.table_exists(table_name).await? {
121            return Err(DataForgeError::database(&format!("Table '{}' does not exist", table_name)));
122        }
123
124        // 获取表结构
125        let schema = self.get_table_schema(table_name).await?;
126        
127        // 生成数据
128        let data = self.generate_table_data(count, &schema)?;
129        
130        // 插入数据
131        self.batch_insert(table_name, data).await
132    }
133
134    /// 根据表结构生成数据
135    fn generate_table_data(&self, count: usize, schema: &TableSchema) -> Result<Vec<HashMap<String, Value>>> {
136        let mut data = Vec::with_capacity(count);
137
138        // 通用数据生成逻辑,根据 schema 生成
139        for _i in 1..=count {
140            let mut row = HashMap::new();
141            
142            for field in &schema.fields {
143                let value = self.generate_value_for_field(field)?;
144                row.insert(field.name.clone(), value);
145            }
146            
147            data.push(row);
148        }
149        
150        Ok(data)
151    }
152    
153    /// MySQL 类型转换为内部数据类型
154    fn mysql_type_to_datatype(&self, mysql_type: &str, max_length: Option<u32>) -> DataType {
155        match mysql_type.to_lowercase().as_str() {
156            "varchar" | "char" | "text" | "tinytext" | "mediumtext" | "longtext" => {
157                DataType::String { max_length: max_length.map(|l| l as usize) }
158            },
159            "int" | "tinyint" | "smallint" | "mediumint" | "bigint" => {
160                DataType::Integer { min: None, max: None }
161            },
162            "float" | "double" | "decimal" => {
163                DataType::Float { min: None, max: None, precision: None }
164            },
165            "boolean" | "bool" => DataType::Boolean,
166            "date" => DataType::Date { format: None },
167            "datetime" | "timestamp" => DataType::DateTime { format: None },
168            "time" => DataType::Time { format: None },
169            "json" => DataType::Json,
170            _ => DataType::String { max_length: Some(255) },
171        }
172    }
173    
174    /// 生成创建表的SQL语句
175    #[allow(dead_code)]
176    fn generate_create_table_sql(&self, schema: &TableSchema) -> Result<String> {
177        use crate::db::schema::DataType;
178        
179        let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
180        
181        let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
182            let data_type = match &field.data_type {
183                DataType::String { max_length } => {
184                    format!("VARCHAR({})", max_length.unwrap_or(255))
185                },
186                DataType::Integer { .. } => "INT".to_string(),
187                DataType::Float { .. } => "FLOAT".to_string(),
188                DataType::Boolean => "BOOLEAN".to_string(),
189                DataType::DateTime { .. } => "DATETIME".to_string(),
190                DataType::Date { .. } => "DATE".to_string(),
191                DataType::Email => "VARCHAR(255)".to_string(),
192                DataType::Phone { .. } => "VARCHAR(20)".to_string(),
193                _ => "TEXT".to_string(),
194            };
195            
196            let mut def = format!("{} {}", field.name, data_type);
197            
198            if !field.constraints.nullable {
199                def.push_str(" NOT NULL");
200            }
201            
202            if field.constraints.unique {
203                def.push_str(" UNIQUE");
204            }
205            
206            def
207        }).collect();
208        
209        sql.push_str(&field_definitions.join(", "));
210        
211        if let Some(ref pk) = schema.primary_key {
212            sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
213        }
214        
215        sql.push_str(")");
216        Ok(sql)
217    }
218
219    /// 根据字段类型生成值
220    fn generate_value_for_field(&self, field: &FieldSchema) -> Result<Value> {
221        // 首先检查是否有指定的生成器类型
222        if let Some(generator_type) = &field.generator_type {
223            return self.generate_value_by_generator_type(field, generator_type);
224        }
225        
226        // 如果没有指定生成器类型,则根据数据类型自动生成
227        match &field.data_type {
228            DataType::String { max_length } => {
229                let max_len = max_length.unwrap_or(255) as usize;
230                let len = std::cmp::min(10, max_len);
231                Ok(Value::String(random_letters(len)))
232            },
233            DataType::Integer { min, max } => {
234                let min_val = min.unwrap_or(0);
235                let max_val = max.unwrap_or(1000000);
236                Ok(Value::Number(serde_json::Number::from(number::random_int(min_val as i32, max_val as i32))))
237            },
238            DataType::Float { .. } => {
239                Ok(Value::Number(serde_json::Number::from_f64(number::random_float(0.0, 100.0)).unwrap()))
240            },
241            DataType::Boolean => {
242                Ok(Value::Bool(rand::random()))
243            },
244            DataType::DateTime { .. } => {
245                Ok(Value::String(datetime::iso8601()))
246            },
247            DataType::Date { .. } => {
248                Ok(Value::String(datetime::date_iso()))
249            },
250            DataType::Email => {
251                Ok(Value::String(internet::email()))
252            },
253            DataType::Phone { .. } => {
254                Ok(Value::String(number::phone_number_cn()))
255            },
256            _ => {
257                Ok(Value::String("default_value".to_string()))
258            }
259        }
260    }
261    
262    /// 根据指定的生成器类型生成值
263    fn generate_value_by_generator_type(&self, field: &FieldSchema, generator_type: &FieldGeneratorType) -> Result<Value> {
264        match generator_type {
265            FieldGeneratorType::Default => {
266                // 使用默认生成逻辑
267                self.generate_value_for_field(field)
268            },
269            FieldGeneratorType::Custom(generator_name) => {
270                // 使用自定义生成器(这里简化处理,实际项目中可能需要更复杂的逻辑)
271                Ok(Value::String(format!("custom_{}", generator_name)))
272            },
273            FieldGeneratorType::RandomString => {
274                let len = match &field.data_type {
275                    DataType::String { max_length } => max_length.unwrap_or(10),
276                    _ => 10,
277                };
278                Ok(Value::String(random_letters(len as usize)))
279            },
280            FieldGeneratorType::RandomInteger => {
281                Ok(Value::Number(serde_json::Number::from(crate::generators::random_int(0, 1000000))))
282            },
283            FieldGeneratorType::RandomFloat => {
284                Ok(Value::Number(serde_json::Number::from_f64(crate::generators::random_float(0.0, 1000.0)).unwrap()))
285            },
286            FieldGeneratorType::RandomBoolean => {
287                Ok(Value::Bool(crate::generators::random_bool()))
288            },
289            FieldGeneratorType::CurrentTimestamp => {
290                Ok(Value::String(datetime::iso8601()))
291            },
292            FieldGeneratorType::RandomDate => {
293                Ok(Value::String(datetime::date_iso()))
294            },
295            FieldGeneratorType::RandomDateTime => {
296                Ok(Value::String(datetime::iso8601()))
297            },
298            FieldGeneratorType::RandomEmail => {
299                Ok(Value::String(crate::generators::internet::email()))
300            },
301            FieldGeneratorType::RandomPhone => {
302                Ok(Value::String(crate::generators::number::phone_number_cn()))
303            },
304            FieldGeneratorType::RandomUrl => {
305                Ok(Value::String(crate::generators::internet::url()))
306            },
307            FieldGeneratorType::Uuid => {
308                Ok(Value::String(crate::generators::uuid_v4()))
309            },
310            FieldGeneratorType::Name => {
311                // 根据语言设置生成相应语言的姓名
312                match crate::generation::DataForge::default().language() {
313                    crate::generation::Language::ZhCN => {
314                        Ok(Value::String(crate::generators::name::zh_cn_fullname()))
315                    },
316                    crate::generation::Language::EnUS => {
317                        Ok(Value::String(crate::generators::name::en_us_fullname()))
318                    },
319                    crate::generation::Language::JaJP => {
320                        Ok(Value::String(crate::generators::name::ja_jp_fullname()))
321                    },
322                }
323            },
324            FieldGeneratorType::CompanyName => {
325                Ok(Value::String(format!("Company {}", crate::generators::random_int(1, 1000))))
326            },
327            FieldGeneratorType::Address => {
328                // 根据语言设置生成相应语言的地址
329                match crate::generation::DataForge::default().language() {
330                    crate::generation::Language::ZhCN => {
331                        Ok(Value::String(format!("{}市{}区{}街道{}", 
332                            crate::generators::address::zh_city(), 
333                            crate::generators::address::zh_city(), 
334                            crate::generators::random_int(1, 100),
335                            crate::generators::random_int(1, 1000))))
336                    },
337                    crate::generation::Language::EnUS => {
338                        Ok(Value::String(format!("{} Street {}, {}, {}", 
339                            crate::generators::address::us_city(),
340                            crate::generators::random_int(1, 1000),
341                            crate::generators::address::us_city(),
342                            crate::generators::address::us_state())))
343                    },
344                    crate::generation::Language::JaJP => {
345                        Ok(Value::String(format!("{}{}{}丁目{}", 
346                            crate::generators::address::us_city(),  // 使用us_city作为替代
347                            crate::generators::address::us_city(),  // 使用us_city作为替代
348                            crate::generators::random_int(1, 20),
349                            crate::generators::random_int(1, 20))))
350                    },
351                }
352            },
353            FieldGeneratorType::ProductName => {
354                let products = ["Laptop", "Smartphone", "Tablet", "Watch", "Headphones", "Speaker"];
355                let product = products[crate::generators::random_int(0, products.len() as i32 - 1) as usize];
356                Ok(Value::String(format!("{} {}", product, crate::generators::random_int(1, 1000))))
357            },
358            FieldGeneratorType::OrderStatus => {
359                let statuses = ["Pending", "Processing", "Shipped", "Delivered", "Cancelled"];
360                let status = statuses[crate::generators::random_int(0, statuses.len() as i32 - 1) as usize];
361                Ok(Value::String(status.to_string()))
362            },
363        }
364    }
365
366    /// 批量插入数据(异步版本)
367    pub async fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<usize> {
368        if data.is_empty() {
369            return Ok(0);
370        }
371
372        let start_time = std::time::Instant::now();
373        let mut total_inserted = 0;
374
375        // 分批插入
376        for chunk in data.chunks(self.config.batch_size) {
377            let inserted = self.insert_chunk(table_name, chunk).await?;
378            total_inserted += inserted;
379        }
380
381        // 更新统计信息
382        self.stats.successful_rows += total_inserted;
383        self.stats.processing_time_ms += start_time.elapsed().as_millis() as u64;
384        self.stats.calculate_speed();
385
386        Ok(total_inserted)
387    }
388
389    /// 插入单个数据块
390    async fn insert_chunk(&self, table_name: &str, chunk: &[HashMap<String, Value>]) -> Result<usize> {
391        if chunk.is_empty() {
392            return Ok(0);
393        }
394
395        // 获取所有字段名
396        let fields: Vec<String> = chunk[0].keys().cloned().collect();
397        let field_list = fields.join(", ");
398        let placeholders = fields.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
399        
400        let sql = format!("INSERT INTO {} ({}) VALUES ({})", table_name, field_list, placeholders);
401
402        let mut transaction = self.pool.begin().await
403            .map_err(|e| DataForgeError::database(&format!("Failed to start transaction: {}", e)))?;
404
405        for row in chunk {
406            let mut query = sqlx::query(&sql);
407            
408            for field in &fields {
409                let value = row.get(field).unwrap_or(&Value::Null);
410                query = match value {
411                    Value::String(s) => query.bind(s),
412                    Value::Number(n) => {
413                        if let Some(i) = n.as_i64() {
414                            query.bind(i)
415                        } else if let Some(f) = n.as_f64() {
416                            query.bind(f)
417                        } else {
418                            query.bind(n.to_string())
419                        }
420                    },
421                    Value::Bool(b) => query.bind(*b),
422                    Value::Null => query.bind(Option::<String>::None),
423                    _ => query.bind(value.to_string()),
424                };
425            }
426            
427            query.execute(&mut *transaction).await
428                .map_err(|e| DataForgeError::database(&format!("Failed to execute insert: {}", e)))?;
429        }
430
431        transaction.commit().await
432            .map_err(|e| DataForgeError::database(&format!("Failed to commit transaction: {}", e)))?;
433
434        Ok(chunk.len())
435    }
436}
437
438/// MySQL 填充器的同步包装器
439pub struct MySqlFillerSync {
440    pub connection_string: String,
441    pub config: FillingConfig,
442    pub stats: FillingStats,
443}
444
445impl MySqlFillerSync {
446    /// 创建同步版本的 MySQL 填充器
447    pub fn new(connection_string: String, config: FillingConfig) -> Self {
448        Self {
449            connection_string,
450            config,
451            stats: FillingStats::default(),
452        }
453    }
454
455    /// 从连接配置创建
456    pub fn from_config(config: ConnectionConfig, filling_config: FillingConfig) -> Result<Self> {
457        let connection_string = config.to_connection_string()?;
458        Ok(Self::new(connection_string, filling_config))
459    }
460    
461    /// 生成创建表的SQL语句
462    fn generate_create_table_sql(&self, schema: &TableSchema) -> Result<String> {
463        use crate::db::schema::DataType;
464        
465        let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
466        
467        let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
468            let data_type = match &field.data_type {
469                DataType::String { max_length } => {
470                    format!("VARCHAR({})", max_length.unwrap_or(255))
471                },
472                DataType::Integer { .. } => "INT".to_string(),
473                DataType::Float { .. } => "FLOAT".to_string(),
474                DataType::Boolean => "BOOLEAN".to_string(),
475                DataType::DateTime { .. } => "DATETIME".to_string(),
476                DataType::Date { .. } => "DATE".to_string(),
477                DataType::Email => "VARCHAR(255)".to_string(),
478                DataType::Phone { .. } => "VARCHAR(20)".to_string(),
479                _ => "TEXT".to_string(),
480            };
481            
482            let mut def = format!("{} {}", field.name, data_type);
483            
484            if !field.constraints.nullable {
485                def.push_str(" NOT NULL");
486            }
487            
488            if field.constraints.unique {
489                def.push_str(" UNIQUE");
490            }
491            
492            def
493        }).collect();
494        
495        sql.push_str(&field_definitions.join(", "));
496        
497        if let Some(ref pk) = schema.primary_key {
498            sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
499        }
500        
501        sql.push_str(")");
502        Ok(sql)
503    }
504
505    /// 检查表是否存在
506    pub fn table_exists(&self, table_name: &str) -> Result<bool> {
507        // 使用同步的运行时来执行异步操作
508        let rt = tokio::runtime::Runtime::new()
509            .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
510        
511        rt.block_on(async {
512            let pool = sqlx::mysql::MySqlPoolOptions::new()
513                .max_connections(1)
514                .connect(&self.connection_string)
515                .await
516                .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
517            
518            let query = "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = ?";
519            let row = sqlx::query(query)
520                .bind(table_name)
521                .fetch_one(&pool)
522                .await
523                .map_err(|e| DataForgeError::database(&format!("Failed to check table existence: {}", e)))?;
524            
525            let count: i64 = row.try_get("count")
526                .map_err(|e| DataForgeError::database(&format!("Failed to get count: {}", e)))?;
527            
528            pool.close().await;
529            Ok(count > 0)
530        })
531    }
532
533    /// 获取表结构
534    pub fn get_table_schema(&self, table_name: &str) -> Result<TableSchema> {
535        // 使用同步的运行时来执行异步操作
536        let rt = tokio::runtime::Runtime::new()
537            .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
538        
539        rt.block_on(async {
540            let pool = sqlx::mysql::MySqlPoolOptions::new()
541                .max_connections(1)
542                .connect(&self.connection_string)
543                .await
544                .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
545            
546            let query = r#"
547                SELECT 
548                    COLUMN_NAME as column_name,
549                    DATA_TYPE as data_type,
550                    IS_NULLABLE as is_nullable,
551                    COLUMN_DEFAULT as column_default,
552                    CHARACTER_MAXIMUM_LENGTH as max_length,
553                    NUMERIC_PRECISION as numeric_precision,
554                    NUMERIC_SCALE as numeric_scale
555                FROM information_schema.COLUMNS 
556                WHERE table_schema = DATABASE() AND table_name = ?
557                ORDER BY ORDINAL_POSITION
558            "#;
559
560            let rows = sqlx::query(query)
561                .bind(table_name)
562                .fetch_all(&pool)
563                .await
564                .map_err(|e| DataForgeError::database(&format!("Failed to get table schema: {}", e)))?;
565
566            let mut fields = Vec::new();
567            for row in rows {
568                let column_name: String = row.try_get("column_name")
569                    .map_err(|e| DataForgeError::database(&format!("Failed to get column_name: {}", e)))?;
570                let data_type_str: String = row.try_get("data_type")
571                    .map_err(|e| DataForgeError::database(&format!("Failed to get data_type: {}", e)))?;
572                let is_nullable: String = row.try_get("is_nullable")
573                    .map_err(|e| DataForgeError::database(&format!("Failed to get is_nullable: {}", e)))?;
574                let max_length: Option<u32> = row.try_get("max_length").ok();
575
576                let data_type = self.mysql_type_to_datatype(&data_type_str, max_length);
577                let nullable = is_nullable.to_uppercase() == "YES";
578
579                fields.push(FieldSchema {
580                    name: column_name,
581                    data_type,
582                    constraints: crate::db::schema::FieldConstraints {
583                        nullable,
584                        unique: false,
585                        default: None,
586                        pattern: None,
587                        min: None,
588                        max: None,
589                    },
590                    description: None,
591                    generator_config: None,
592                    generator_type: None,
593                });
594            }
595            
596            pool.close().await;
597            
598            Ok(TableSchema {
599                name: table_name.to_string(),
600                fields,
601                primary_key: None,
602                indexes: vec![],
603                description: None,
604            })
605        })
606    }
607
608    /// MySQL 类型转换为内部数据类型
609    fn mysql_type_to_datatype(&self, mysql_type: &str, max_length: Option<u32>) -> DataType {
610        match mysql_type.to_lowercase().as_str() {
611            "varchar" | "char" | "text" | "tinytext" | "mediumtext" | "longtext" => {
612                DataType::String { max_length: max_length.map(|l| l as usize) }
613            },
614            "int" | "tinyint" | "smallint" | "mediumint" | "bigint" => {
615                DataType::Integer { min: None, max: None }
616            },
617            "float" | "double" | "decimal" => {
618                DataType::Float { min: None, max: None, precision: None }
619            },
620            "boolean" | "bool" => DataType::Boolean,
621            "date" => DataType::Date { format: None },
622            "datetime" | "timestamp" => DataType::DateTime { format: None },
623            "time" => DataType::Time { format: None },
624            "json" => DataType::Json,
625            _ => DataType::String { max_length: Some(255) },
626        }
627    }
628
629    /// 通用表填充方法
630    pub fn fill_table(&mut self, table_name: &str, count: usize) -> Result<usize> {
631        let start_time = std::time::Instant::now();
632        
633        // 检查表是否存在
634        if !self.table_exists(table_name)? {
635            return Err(DataForgeError::database(&format!("Table '{}' does not exist", table_name)));
636        }
637
638        // 获取表结构
639        let schema = self.get_table_schema(table_name)?;
640        
641        // 生成数据
642        let data = self.generate_table_data(count, &schema)?;
643        
644        // 执行真正的批量插入
645        let inserted = self.batch_insert(table_name, data)?;
646        
647        // 更新统计信息
648        self.stats.total_rows = count;
649        self.stats.successful_rows = inserted;
650        self.stats.processing_time_ms = start_time.elapsed().as_millis() as u64;
651        self.stats.calculate_speed();
652
653        #[cfg(debug_assertions)]
654        {
655            println!("成功插入 {} 条记录到表 {}", inserted, table_name);
656            println!("处理时间: {}ms", self.stats.processing_time_ms);
657        }
658
659        Ok(inserted)
660    }
661
662    /// 根据表结构生成数据
663    fn generate_table_data(&self, count: usize, schema: &TableSchema) -> Result<Vec<HashMap<String, Value>>> {
664        let mut data = Vec::with_capacity(count);
665
666        // 通用数据生成逻辑,根据 schema 生成
667        for _i in 1..=count {
668            let mut row = HashMap::new();
669            
670            for field in &schema.fields {
671                let value = self.generate_value_for_field(field)?;
672                row.insert(field.name.clone(), value);
673            }
674            
675            data.push(row);
676        }
677        
678        Ok(data)
679    }
680
681    /// 根据字段类型生成值
682    fn generate_value_for_field(&self, field: &FieldSchema) -> Result<Value> {
683        // 首先检查是否有指定的生成器类型
684        if let Some(generator_type) = &field.generator_type {
685            return self.generate_value_by_generator_type(field, generator_type);
686        }
687        
688        // 如果没有指定生成器类型,则根据数据类型自动生成
689        match &field.data_type {
690            DataType::String { max_length } => {
691                let max_len = max_length.unwrap_or(255) as usize;
692                let len = std::cmp::min(10, max_len);
693                Ok(Value::String(random_letters(len)))
694            },
695            DataType::Integer { min, max } => {
696                let min_val = min.unwrap_or(0);
697                let max_val = max.unwrap_or(1000000);
698                Ok(Value::Number(serde_json::Number::from(number::random_int(min_val as i32, max_val as i32))))
699            },
700            DataType::Float { .. } => {
701                Ok(Value::Number(serde_json::Number::from_f64(number::random_float(0.0, 100.0)).unwrap()))
702            },
703            DataType::Boolean => {
704                Ok(Value::Bool(rand::random()))
705            },
706            DataType::DateTime { .. } => {
707                Ok(Value::String(datetime::iso8601()))
708            },
709            DataType::Date { .. } => {
710                Ok(Value::String(datetime::date_iso()))
711            },
712            DataType::Email => {
713                Ok(Value::String(internet::email()))
714            },
715            DataType::Phone { .. } => {
716                Ok(Value::String(number::phone_number_cn()))
717            },
718            _ => {
719                Ok(Value::String("default_value".to_string()))
720            }
721        }
722    }
723    
724    /// 根据指定的生成器类型生成值
725    fn generate_value_by_generator_type(&self, field: &FieldSchema, generator_type: &FieldGeneratorType) -> Result<Value> {
726        match generator_type {
727            FieldGeneratorType::Default => {
728                // 使用默认生成逻辑
729                self.generate_value_for_field(field)
730            },
731            FieldGeneratorType::Custom(generator_name) => {
732                // 使用自定义生成器(这里简化处理,实际项目中可能需要更复杂的逻辑)
733                Ok(Value::String(format!("custom_{}", generator_name)))
734            },
735            FieldGeneratorType::RandomString => {
736                let len = match &field.data_type {
737                    DataType::String { max_length } => max_length.unwrap_or(10),
738                    _ => 10,
739                };
740                Ok(Value::String(random_letters(len as usize)))
741            },
742            FieldGeneratorType::RandomInteger => {
743                Ok(Value::Number(serde_json::Number::from(crate::generators::random_int(0, 1000000))))
744            },
745            FieldGeneratorType::RandomFloat => {
746                Ok(Value::Number(serde_json::Number::from_f64(crate::generators::random_float(0.0, 1000.0)).unwrap()))
747            },
748            FieldGeneratorType::RandomBoolean => {
749                Ok(Value::Bool(crate::generators::random_bool()))
750            },
751            FieldGeneratorType::CurrentTimestamp => {
752                Ok(Value::String(datetime::iso8601()))
753            },
754            FieldGeneratorType::RandomDate => {
755                Ok(Value::String(datetime::date_iso()))
756            },
757            FieldGeneratorType::RandomDateTime => {
758                Ok(Value::String(datetime::iso8601()))
759            },
760            FieldGeneratorType::RandomEmail => {
761                Ok(Value::String(crate::generators::internet::email()))
762            },
763            FieldGeneratorType::RandomPhone => {
764                Ok(Value::String(crate::generators::number::phone_number_cn()))
765            },
766            FieldGeneratorType::RandomUrl => {
767                Ok(Value::String(crate::generators::internet::url()))
768            },
769            FieldGeneratorType::Uuid => {
770                Ok(Value::String(crate::generators::uuid_v4()))
771            },
772            FieldGeneratorType::Name => {
773                Ok(Value::String(crate::generators::name::zh_cn_fullname()))
774            },
775            FieldGeneratorType::CompanyName => {
776                Ok(Value::String(format!("Company {}", crate::generators::random_int(1, 1000))))
777            },
778            FieldGeneratorType::Address => {
779                Ok(Value::String(format!("{} Street {}", 
780                    crate::generators::address::zh_city(), 
781                    crate::generators::random_int(1, 1000))))
782            },
783            FieldGeneratorType::ProductName => {
784                let products = ["Laptop", "Smartphone", "Tablet", "Watch", "Headphones", "Speaker"];
785                let product = products[crate::generators::random_int(0, products.len() as i32 - 1) as usize];
786                Ok(Value::String(format!("{} {}", product, crate::generators::random_int(1, 1000))))
787            },
788            FieldGeneratorType::OrderStatus => {
789                let statuses = ["Pending", "Processing", "Shipped", "Delivered", "Cancelled"];
790                let status = statuses[crate::generators::random_int(0, statuses.len() as i32 - 1) as usize];
791                Ok(Value::String(status.to_string()))
792            },
793        }
794    }
795
796    /// 批量插入数据(同步版本)
797    pub fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<usize> {
798        if data.is_empty() {
799            return Ok(0);
800        }
801
802        let start_time = std::time::Instant::now();
803        let mut total_inserted = 0;
804
805        // 分批插入
806        for chunk in data.chunks(self.config.batch_size) {
807            let inserted = self.insert_chunk(table_name, chunk)?;
808            total_inserted += inserted;
809        }
810
811        // 更新统计信息
812        self.stats.successful_rows += total_inserted;
813        self.stats.processing_time_ms += start_time.elapsed().as_millis() as u64;
814        self.stats.calculate_speed();
815
816        Ok(total_inserted)
817    }
818
819    /// 插入单个数据块
820    fn insert_chunk(&self, table_name: &str, chunk: &[HashMap<String, Value>]) -> Result<usize> {
821        if chunk.is_empty() {
822            return Ok(0);
823        }
824
825        // 使用同步的运行时来执行异步操作
826        let rt = tokio::runtime::Runtime::new()
827            .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
828        
829        rt.block_on(async {
830            let pool = sqlx::mysql::MySqlPoolOptions::new()
831                .max_connections(1)
832                .connect(&self.connection_string)
833                .await
834                .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
835            
836            // 获取所有字段名
837            let fields: Vec<String> = chunk[0].keys().cloned().collect();
838            let field_list = fields.join(", ");
839            let placeholders = fields.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
840            
841            let sql = format!("INSERT INTO {} ({}) VALUES ({})", table_name, field_list, placeholders);
842
843            let mut transaction = pool.begin().await
844                .map_err(|e| DataForgeError::database(&format!("Failed to start transaction: {}", e)))?;
845
846            for row in chunk {
847                let mut query = sqlx::query(&sql);
848                
849                for field in &fields {
850                    let value = row.get(field).unwrap_or(&Value::Null);
851                    query = match value {
852                        Value::String(s) => query.bind(s),
853                        Value::Number(n) => {
854                            if let Some(i) = n.as_i64() {
855                                query.bind(i)
856                            } else if let Some(f) = n.as_f64() {
857                                query.bind(f)
858                            } else {
859                                query.bind(n.to_string())
860                            }
861                        },
862                        Value::Bool(b) => query.bind(*b),
863                        Value::Null => query.bind(Option::<String>::None),
864                        _ => query.bind(value.to_string()),
865                    };
866                }
867                
868                query.execute(&mut *transaction).await
869                    .map_err(|e| DataForgeError::database(&format!("Failed to execute insert: {}", e)))?;
870            }
871
872            transaction.commit().await
873                .map_err(|e| DataForgeError::database(&format!("Failed to commit transaction: {}", e)))?;
874            
875            pool.close().await;
876            Ok(chunk.len())
877        })
878    }
879
880}
881
882impl DatabaseFiller for MySqlFillerSync {
883    fn create_table(&mut self, schema: &TableSchema) -> Result<()> {
884        // 使用真正的 SQL 执行创建表操作
885        let rt = tokio::runtime::Runtime::new()
886            .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
887        
888        rt.block_on(async {
889            let pool = sqlx::mysql::MySqlPoolOptions::new()
890                .max_connections(1)
891                .connect(&self.connection_string)
892                .await
893                .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
894            
895            // 生成 CREATE TABLE SQL 语句
896            let sql = self.generate_create_table_sql(schema)?;
897            
898            sqlx::query(&sql)
899                .execute(&pool)
900                .await
901                .map_err(|e| DataForgeError::database(&format!("Failed to create table: {}", e)))?;
902            
903            pool.close().await;
904            Ok(())
905        })
906    }
907
908    fn insert_data(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
909        // 使用真正的数据库插入
910        self.batch_insert(table_name, data)?;
911        Ok(())
912    }
913
914    fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
915        // 使用真正的批量插入(调用实例方法)
916        let _inserted = MySqlFillerSync::batch_insert(self, table_name, data)?;
917        Ok(())
918    }
919
920    fn get_stats(&self) -> FillingStats {
921        self.stats.clone()
922    }
923
924    fn truncate_table(&mut self, table_name: &str) -> Result<()> {
925        // 使用真正的 SQL 执行截断表操作
926        let rt = tokio::runtime::Runtime::new()
927            .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
928        
929        rt.block_on(async {
930            let pool = sqlx::mysql::MySqlPoolOptions::new()
931                .max_connections(1)
932                .connect(&self.connection_string)
933                .await
934                .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
935            
936            let sql = format!("TRUNCATE TABLE {}", table_name);
937            sqlx::query(&sql)
938                .execute(&pool)
939                .await
940                .map_err(|e| DataForgeError::database(&format!("Failed to truncate table: {}", e)))?;
941            
942            pool.close().await;
943            Ok(())
944        })
945    }
946
947    fn drop_table(&mut self, table_name: &str) -> Result<()> {
948        // 使用真正的 SQL 执行删除表操作
949        let rt = tokio::runtime::Runtime::new()
950            .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
951        
952        rt.block_on(async {
953            let pool = sqlx::mysql::MySqlPoolOptions::new()
954                .max_connections(1)
955                .connect(&self.connection_string)
956                .await
957                .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
958            
959            let sql = format!("DROP TABLE IF EXISTS {}", table_name);
960            sqlx::query(&sql)
961                .execute(&pool)
962                .await
963                .map_err(|e| DataForgeError::database(&format!("Failed to drop table: {}", e)))?;
964            
965            pool.close().await;
966            Ok(())
967        })
968    }
969}
970
971#[cfg(test)]
972mod tests {
973    use super::*;
974    use crate::filling::ConnectionConfig;
975    use crate::{forge, name};
976
977    #[test]
978    fn test_mysql_filler_sync_creation() {
979        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
980            .with_port(13307);
981        let filling_config = FillingConfig::default();
982        
983        let filler = MySqlFillerSync::from_config(config, filling_config);
984        assert!(filler.is_ok());
985    }
986
987    #[test]
988    fn test_mysql_connection_string() {
989        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
990            .with_port(13307);
991        
992        let connection_string = config.to_connection_string().unwrap();
993        assert!(connection_string.contains("mysql://root:Aa123456@localhost:13307/demo"));
994    }
995
996    #[test]
997    fn test_generate_sample_employee() {
998        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
999            .with_port(13307);
1000        let filling_config = FillingConfig::default();
1001        let filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1002        
1003        let employee = generate_sample_employee(1);
1004        assert!(employee.is_object());
1005        // 更新测试以适应动态生成方式
1006        // 我们现在验证生成的对象至少有一个字段,而不是特定的硬编码字段
1007        assert!(!employee.as_object().unwrap().is_empty());
1008    }
1009
1010    #[test]
1011    fn test_generate_sample_employee_with_actual_schema() {
1012        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1013            .with_port(13307);
1014        let filling_config = FillingConfig::default();
1015        let filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1016        
1017        // 测试获取表结构的功能
1018        let _schema_result = filler.get_table_schema("employees");
1019        // 注意:在没有真实数据库连接的测试环境中,这个调用可能会失败
1020        // 但我们仍然可以测试错误处理路径
1021        
1022        // 无论如何,generate_sample_employee 应该能够生成数据
1023        let employee = generate_sample_employee(1);
1024        assert!(employee.is_object());
1025        // 现在我们只验证至少有一个字段存在,而不是特定的硬编码字段
1026        assert!(!employee.as_object().unwrap().is_empty());
1027    }
1028
1029    #[test]
1030    fn test_dynamic_schema_generation() {
1031        // 创建一个自定义的表结构来测试动态生成
1032        let custom_schema = TableSchema {
1033            name: "custom_employees".to_string(),
1034            fields: vec![
1035                FieldSchema {
1036                    name: "employee_id".to_string(),
1037                    data_type: DataType::Integer { min: None, max: None },
1038                    constraints: crate::db::schema::FieldConstraints {
1039                        nullable: false,
1040                        unique: true,
1041                        default: None,
1042                        pattern: None,
1043                        min: None,
1044                        max: None,
1045                    },
1046                    description: None,
1047                    generator_config: None,
1048                    generator_type: None,
1049                },
1050                FieldSchema {
1051                    name: "full_name".to_string(),
1052                    data_type: DataType::String { max_length: Some(100) },
1053                    constraints: crate::db::schema::FieldConstraints {
1054                        nullable: false,
1055                        unique: false,
1056                        default: None,
1057                        pattern: None,
1058                        min: None,
1059                        max: None,
1060                    },
1061                    description: None,
1062                    generator_config: None,
1063                    generator_type: None,
1064                },
1065                FieldSchema {
1066                    name: "birth_date".to_string(),
1067                    data_type: DataType::Date { format: None },
1068                    constraints: crate::db::schema::FieldConstraints {
1069                        nullable: false,
1070                        unique: false,
1071                        default: None,
1072                        pattern: None,
1073                        min: None,
1074                        max: None,
1075                    },
1076                    description: None,
1077                    generator_config: None,
1078                    generator_type: None,
1079                },
1080                FieldSchema {
1081                    name: "salary".to_string(),
1082                    data_type: DataType::Integer { min: Some(0), max: Some(1000000) },
1083                    constraints: crate::db::schema::FieldConstraints {
1084                        nullable: false,
1085                        unique: false,
1086                        default: None,
1087                        pattern: None,
1088                        min: None,
1089                        max: None,
1090                    },
1091                    description: None,
1092                    generator_config: None,
1093                    generator_type: None,
1094                },
1095            ],
1096            primary_key: Some(vec!["employee_id".to_string()]),
1097            indexes: vec![],
1098            description: None,
1099        };
1100        
1101        // 使用自定义表结构生成数据
1102        let employee_data = crate::filling::utils::generate_sample_data_by_schema(&custom_schema, 1);
1103        
1104        // 验证生成的数据字段与表结构匹配
1105        assert!(employee_data.is_object());
1106        let employee_obj = employee_data.as_object().unwrap();
1107        
1108        // 验证所有定义的字段都存在
1109        assert!(employee_obj.contains_key("employee_id"));
1110        assert!(employee_obj.contains_key("full_name"));
1111        assert!(employee_obj.contains_key("birth_date"));
1112        assert!(employee_obj.contains_key("salary"));
1113        
1114        // 验证字段值类型正确
1115        assert!(employee_obj.get("employee_id").unwrap().is_number());
1116        assert!(employee_obj.get("full_name").unwrap().is_string());
1117        assert!(employee_obj.get("birth_date").unwrap().is_string());
1118        assert!(employee_obj.get("salary").unwrap().is_number());
1119    }
1120
1121    #[test]
1122    fn test_fill_employees_table() {
1123        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1124            .with_port(13307);
1125        let filling_config = FillingConfig::default();
1126        let mut filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1127        
1128        let result = filler.fill_table("employees", 10);
1129        match &result {
1130            Ok(count) => {
1131                println!("成功插入 {} 条记录", count);
1132            },
1133            Err(e) => {
1134                println!("填充数据失败: {}", e);
1135                // 这个测试可能因为没有真实的MySQL服务器而失败
1136                // 在没有数据库连接的情况下,这是预期的行为
1137                return;
1138            }
1139        }
1140        
1141        assert!(result.is_ok());
1142        assert_eq!(result.unwrap(), 10);
1143        
1144        let stats = filler.get_stats();
1145        assert_eq!(stats.successful_rows, 10);
1146    }
1147
1148    #[test]
1149    fn test_table_schema() {
1150        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1151            .with_port(13307);
1152        let filling_config = FillingConfig::default();
1153        let filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1154        
1155        let schema = filler.get_table_schema("employees");
1156        match &schema {
1157            Ok(s) => {
1158                println!("成功获取表结构: {}", s.name);
1159                println!("字段数: {}", s.fields.len());
1160            },
1161            Err(e) => {
1162                println!("获取表结构失败: {}", e);
1163                // 这个测试可能因为没有真实的MySQL服务器而失败
1164                // 在没有数据库连接的情况下,这是预期的行为
1165                return;
1166            }
1167        }
1168        
1169        assert!(schema.is_ok());
1170        
1171        let schema = schema.unwrap();
1172        assert_eq!(schema.name, "employees");
1173        assert!(!schema.fields.is_empty());
1174    }
1175
1176    #[test]
1177    fn test_batch_insert() {
1178        let config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1179            .with_port(13307);
1180        let filling_config = FillingConfig::default();
1181        let mut filler = MySqlFillerSync::from_config(config, filling_config).unwrap();
1182        
1183        let mut test_data = Vec::new();
1184        for i in 1..=5 {
1185            let mut row = HashMap::new();
1186            row.insert("emp_no".to_string(), Value::Number((100000 + i).into()));
1187            row.insert("first_name".to_string(), Value::String(format!("First{}", i)));
1188            row.insert("last_name".to_string(), Value::String(format!("Last{}", i)));
1189            test_data.push(row);
1190        }
1191        
1192        let result = filler.batch_insert("employees", test_data);
1193        // 在没有数据库连接的情况下,这个测试可能会失败,这是预期的行为
1194        match &result {
1195            Ok(count) => {
1196                assert_eq!(*count, 5);
1197            },
1198            Err(e) => {
1199                // 如果没有数据库连接,错误是预期的
1200                println!("数据库连接失败(在预期中): {}", e);
1201                // 我们仍然可以检查错误类型是否正确
1202                // 但由于这是一个集成测试,我们接受任何数据库错误
1203            }
1204        }
1205    }
1206
1207    /// 生成示例员工数据
1208    fn generate_sample_employee(index: usize) -> Value {
1209        let connection_config = ConnectionConfig::mysql("localhost", "demo", "root", "Aa123456")
1210            .with_port(13307)
1211            .with_param("charset", "utf8mb4");
1212
1213        let filling_config = FillingConfig {
1214            batch_size: 100,
1215            use_transaction: true,
1216            timeout_seconds: 60,
1217            retry_count: 2,
1218        };
1219
1220        let mut filler = MySqlFillerSync::from_config(connection_config, filling_config).unwrap();
1221
1222        // 尝试从数据库获取实际的员工表结构
1223        match filler.get_table_schema("employees") {
1224            Ok(schema) => {
1225                // 使用实际的表结构生成数据
1226                crate::filling::utils::generate_sample_data_by_schema(&schema, index)
1227            },
1228            Err(_) => {
1229                // 如果无法获取表结构,创建一个最小化的通用表结构
1230                // 这样可以确保即使在没有数据库连接的情况下也能生成基本数据
1231                let generic_schema = TableSchema {
1232                    name: "employees".to_string(),
1233                    fields: vec![
1234                        FieldSchema {
1235                            name: "id".to_string(),
1236                            data_type: DataType::Integer { min: None, max: None },
1237                            constraints: crate::db::schema::FieldConstraints {
1238                                nullable: false,
1239                                unique: true,
1240                                default: None,
1241                                pattern: None,
1242                                min: None,
1243                                max: None,
1244                            },
1245                            description: None,
1246                            generator_config: None,
1247                            generator_type: None,
1248                        },
1249                    ],
1250                    primary_key: Some(vec!["id".to_string()]),
1251                    indexes: vec![],
1252                    description: None,
1253                };
1254
1255                crate::filling::utils::generate_sample_data_by_schema(&generic_schema, index)
1256            }
1257        }
1258    }
1259
1260}