dataforge/filling/
filling.rs

1//! 数据填充逻辑实现
2
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5use serde_json::Value;
6use crate::error::{DataForgeError, Result};
7use crate::db::schema::TableSchema;
8use super::{DatabaseFiller, FillingConfig, FillingStats, ConnectionManager};
9
10/// 数据填充执行器
11pub struct FillingExecutor {
12    connection_manager: ConnectionManager,
13    config: FillingConfig,
14    stats: FillingStats,
15}
16
17impl FillingExecutor {
18    /// 创建新的填充执行器
19    pub fn new(connection_manager: ConnectionManager, config: FillingConfig) -> Self {
20        Self {
21            connection_manager,
22            config,
23            stats: FillingStats::default(),
24        }
25    }
26
27    /// 执行完整的数据填充流程
28    pub fn execute_filling(&mut self, schema: &TableSchema, data: Vec<HashMap<String, Value>>) -> Result<()> {
29        let start_time = Instant::now();
30        
31        // 重置统计信息
32        self.stats = FillingStats::default();
33        self.stats.total_rows = data.len();
34
35        // 创建表
36        self.create_table(schema)?;
37
38        // 批量插入数据
39        self.batch_insert(&schema.name, data)?;
40
41        // 更新统计信息
42        self.stats.processing_time_ms = start_time.elapsed().as_millis() as u64;
43        self.stats.calculate_speed();
44
45        Ok(())
46    }
47
48    /// 执行数据填充(带重试机制)
49    pub fn execute_with_retry(&mut self, schema: &TableSchema, data: Vec<HashMap<String, Value>>) -> Result<()> {
50        let mut last_error = None;
51        
52        for attempt in 0..=self.config.retry_count {
53            match self.execute_filling(schema, data.clone()) {
54                Ok(()) => return Ok(()),
55                Err(e) => {
56                    last_error = Some(e);
57                    if attempt < self.config.retry_count {
58                        // 等待一段时间后重试
59                        std::thread::sleep(Duration::from_millis(1000 * (attempt + 1) as u64));
60                    }
61                }
62            }
63        }
64
65        Err(last_error.unwrap_or_else(|| DataForgeError::generator("Unknown error during retry")))
66    }
67
68    /// 分块处理大量数据
69    pub fn execute_chunked(&mut self, schema: &TableSchema, data: Vec<HashMap<String, Value>>) -> Result<()> {
70        let start_time = Instant::now();
71        
72        // 重置统计信息
73        self.stats = FillingStats::default();
74        self.stats.total_rows = data.len();
75
76        // 创建表
77        self.create_table(schema)?;
78
79        // 分块处理数据
80        let chunks: Vec<_> = data.chunks(self.config.batch_size).collect();
81        
82        for (i, chunk) in chunks.iter().enumerate() {
83            match self.insert_data(&schema.name, chunk.to_vec()) {
84                Ok(()) => {
85                    self.stats.successful_rows += chunk.len();
86                }
87                Err(e) => {
88                    self.stats.failed_rows += chunk.len();
89                    eprintln!("Failed to insert chunk {}: {}", i, e);
90                }
91            }
92        }
93
94        // 更新统计信息
95        self.stats.processing_time_ms = start_time.elapsed().as_millis() as u64;
96        self.stats.calculate_speed();
97
98        Ok(())
99    }
100
101    /// 验证数据完整性
102    pub fn validate_data(&self, data: &[HashMap<String, Value>], schema: &TableSchema) -> Result<Vec<String>> {
103        let mut errors = Vec::new();
104
105        for (index, row) in data.iter().enumerate() {
106            // 检查必需字段
107            for field in &schema.fields {
108                if !field.constraints.nullable && !row.contains_key(&field.name) {
109                    errors.push(format!("Row {}: Missing required field '{}'", index, field.name));
110                }
111            }
112
113            // 检查唯一性约束(简单检查,实际应该查询数据库)
114            for field in &schema.fields {
115                if field.constraints.unique {
116                    if let Some(value) = row.get(&field.name) {
117                        if value.is_null() && !field.constraints.nullable {
118                            errors.push(format!("Row {}: Unique field '{}' cannot be null", index, field.name));
119                        }
120                    }
121                }
122            }
123
124            // 检查数据类型
125            for (field_name, value) in row {
126                if let Some(field) = schema.fields.iter().find(|f| f.name == *field_name) {
127                    if let Err(e) = self.validate_field_value(value, field) {
128                        errors.push(format!("Row {}: Field '{}' - {}", index, field_name, e));
129                    }
130                }
131            }
132        }
133
134        Ok(errors)
135    }
136
137    /// 验证字段值
138    fn validate_field_value(&self, value: &Value, field: &crate::db::schema::FieldSchema) -> Result<()> {
139        use crate::db::schema::DataType;
140
141        match (&field.data_type, value) {
142            (DataType::String { max_length }, Value::String(s)) => {
143                if let Some(max_len) = max_length {
144                    if s.len() > *max_len {
145                        return Err(DataForgeError::validation(&format!(
146                            "String length {} exceeds maximum {}", s.len(), max_len
147                        )));
148                    }
149                }
150            },
151            (DataType::Integer { min, max }, Value::Number(n)) => {
152                if let Some(i) = n.as_i64() {
153                    if let Some(min_val) = min {
154                        if i < *min_val {
155                            return Err(DataForgeError::validation(&format!(
156                                "Integer {} is less than minimum {}", i, min_val
157                            )));
158                        }
159                    }
160                    if let Some(max_val) = max {
161                        if i > *max_val {
162                            return Err(DataForgeError::validation(&format!(
163                                "Integer {} exceeds maximum {}", i, max_val
164                            )));
165                        }
166                    }
167                } else {
168                    return Err(DataForgeError::validation("Expected integer value"));
169                }
170            },
171            (DataType::Email, Value::String(s)) => {
172                if !s.contains('@') || !s.contains('.') {
173                    return Err(DataForgeError::validation("Invalid email format"));
174                }
175            },
176            (DataType::Phone { .. }, Value::String(s)) => {
177                if s.is_empty() || s.len() > 20 {
178                    return Err(DataForgeError::validation("Invalid phone number format"));
179                }
180            },
181            _ => {} // 其他类型的验证可以在这里添加
182        }
183
184        Ok(())
185    }
186
187    /// 生成插入SQL语句
188    fn generate_insert_sql(&self, table_name: &str, data: &[HashMap<String, Value>]) -> Result<String> {
189        if data.is_empty() {
190            return Err(DataForgeError::validation("No data to insert"));
191        }
192
193        let first_row = &data[0];
194        let columns: Vec<&String> = first_row.keys().collect();
195        
196        let mut sql = format!("INSERT INTO {} (", table_name);
197        sql.push_str(&columns.iter().map(|c| c.as_str()).collect::<Vec<_>>().join(", "));
198        sql.push_str(") VALUES ");
199
200        let value_clauses: Vec<String> = data.iter().map(|row| {
201            let values: Vec<String> = columns.iter().map(|col| {
202                match row.get(*col) {
203                    Some(Value::String(s)) => format!("'{}'", s.replace("'", "''")),
204                    Some(Value::Number(n)) => n.to_string(),
205                    Some(Value::Bool(b)) => if *b { "TRUE" } else { "FALSE" }.to_string(),
206                    Some(Value::Null) | None => "NULL".to_string(),
207                    Some(other) => format!("'{}'", other.to_string().replace("'", "''")),
208                }
209            }).collect();
210            format!("({})", values.join(", "))
211        }).collect();
212
213        sql.push_str(&value_clauses.join(", "));
214        Ok(sql)
215    }
216    
217    /// 执行 SQL 语句
218    fn execute_sql(&self, sql: &str) -> Result<()> {
219        // 根据连接管理器中的连接信息执行真正的 SQL
220        match &self.connection_manager.database_type() {
221            crate::filling::DatabaseType::MySQL => {
222                self.execute_mysql_sql(sql)
223            },
224            crate::filling::DatabaseType::PostgreSQL => {
225                self.execute_postgres_sql(sql)
226            },
227            crate::filling::DatabaseType::SQLite => {
228                self.execute_sqlite_sql(sql)
229            },
230        }
231    }
232    
233    /// 执行 MySQL SQL 语句
234    fn execute_mysql_sql(&self, sql: &str) -> Result<()> {
235        #[cfg(feature = "database")]
236        {
237            let connection_string = self.connection_manager.get_connection_string()?;
238            let rt = tokio::runtime::Runtime::new()
239                .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
240            
241            rt.block_on(async {
242                let pool = sqlx::mysql::MySqlPoolOptions::new()
243                    .max_connections(1)
244                    .connect(&connection_string)
245                    .await
246                    .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
247                
248                sqlx::query(sql)
249                    .execute(&pool)
250                    .await
251                    .map_err(|e| DataForgeError::database(&format!("Failed to execute SQL: {}", e)))?;
252                
253                pool.close().await;
254                Ok(())
255            })
256        }
257        #[cfg(not(feature = "database"))]
258        {
259            Err(DataForgeError::database("Database feature not enabled"))
260        }
261    }
262    
263    /// 执行 PostgreSQL SQL 语句
264    fn execute_postgres_sql(&self, sql: &str) -> Result<()> {
265        #[cfg(feature = "database")]
266        {
267            let connection_string = self.connection_manager.get_connection_string()?;
268            let rt = tokio::runtime::Runtime::new()
269                .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
270            
271            rt.block_on(async {
272                let pool = sqlx::postgres::PgPoolOptions::new()
273                    .max_connections(1)
274                    .connect(&connection_string)
275                    .await
276                    .map_err(|e| DataForgeError::database(&format!("Failed to connect to PostgreSQL: {}", e)))?;
277                
278                sqlx::query(sql)
279                    .execute(&pool)
280                    .await
281                    .map_err(|e| DataForgeError::database(&format!("Failed to execute SQL: {}", e)))?;
282                
283                pool.close().await;
284                Ok(())
285            })
286        }
287        #[cfg(not(feature = "database"))]
288        {
289            Err(DataForgeError::database("Database feature not enabled"))
290        }
291    }
292    
293    /// 执行 SQLite SQL 语句
294    fn execute_sqlite_sql(&self, sql: &str) -> Result<()> {
295        #[cfg(feature = "database")]
296        {
297            let connection_string = self.connection_manager.get_connection_string()?;
298            let rt = tokio::runtime::Runtime::new()
299                .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
300            
301            rt.block_on(async {
302                let pool = sqlx::sqlite::SqlitePoolOptions::new()
303                    .max_connections(1)
304                    .connect(&connection_string)
305                    .await
306                    .map_err(|e| DataForgeError::database(&format!("Failed to connect to SQLite: {}", e)))?;
307                
308                sqlx::query(sql)
309                    .execute(&pool)
310                    .await
311                    .map_err(|e| DataForgeError::database(&format!("Failed to execute SQL: {}", e)))?;
312                
313                pool.close().await;
314                Ok(())
315            })
316        }
317        #[cfg(not(feature = "database"))]
318        {
319            Err(DataForgeError::database("Database feature not enabled"))
320        }
321    }
322    
323    /// 生成 MySQL CREATE TABLE SQL
324    fn generate_create_table_sql_mysql(&self, schema: &TableSchema) -> Result<String> {
325        use crate::db::schema::DataType;
326        
327        let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
328        
329        let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
330            let data_type = match &field.data_type {
331                DataType::String { max_length } => {
332                    format!("VARCHAR({})", max_length.unwrap_or(255))
333                },
334                DataType::Integer { .. } => "INT".to_string(),
335                DataType::Float { .. } => "FLOAT".to_string(),
336                DataType::Boolean => "BOOLEAN".to_string(),
337                DataType::DateTime { .. } => "DATETIME".to_string(),
338                DataType::Date { .. } => "DATE".to_string(),
339                DataType::Email => "VARCHAR(255)".to_string(),
340                DataType::Phone { .. } => "VARCHAR(20)".to_string(),
341                _ => "TEXT".to_string(),
342            };
343            
344            let mut def = format!("{} {}", field.name, data_type);
345            
346            if !field.constraints.nullable {
347                def.push_str(" NOT NULL");
348            }
349            
350            if field.constraints.unique {
351                def.push_str(" UNIQUE");
352            }
353            
354            def
355        }).collect();
356        
357        sql.push_str(&field_definitions.join(", "));
358        
359        if let Some(ref pk) = schema.primary_key {
360            sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
361        }
362        
363        sql.push_str(")");
364        Ok(sql)
365    }
366    
367    /// 生成 PostgreSQL CREATE TABLE SQL
368    fn generate_create_table_sql_postgres(&self, schema: &TableSchema) -> Result<String> {
369        use crate::db::schema::DataType;
370        
371        let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
372        
373        let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
374            let data_type = match &field.data_type {
375                DataType::String { max_length } => {
376                    format!("VARCHAR({})", max_length.unwrap_or(255))
377                },
378                DataType::Integer { .. } => "INTEGER".to_string(),
379                DataType::Float { .. } => "REAL".to_string(),
380                DataType::Boolean => "BOOLEAN".to_string(),
381                DataType::DateTime { .. } => "TIMESTAMP".to_string(),
382                DataType::Date { .. } => "DATE".to_string(),
383                DataType::Email => "VARCHAR(255)".to_string(),
384                DataType::Phone { .. } => "VARCHAR(20)".to_string(),
385                _ => "TEXT".to_string(),
386            };
387            
388            let mut def = format!("{} {}", field.name, data_type);
389            
390            if !field.constraints.nullable {
391                def.push_str(" NOT NULL");
392            }
393            
394            if field.constraints.unique {
395                def.push_str(" UNIQUE");
396            }
397            
398            def
399        }).collect();
400        
401        sql.push_str(&field_definitions.join(", "));
402        
403        if let Some(ref pk) = schema.primary_key {
404            sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
405        }
406        
407        sql.push_str(")");
408        Ok(sql)
409    }
410    
411    /// 生成 SQLite CREATE TABLE SQL
412    fn generate_create_table_sql_sqlite(&self, schema: &TableSchema) -> Result<String> {
413        use crate::db::schema::DataType;
414        
415        let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
416        
417        let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
418            let data_type = match &field.data_type {
419                DataType::String { .. } => "TEXT".to_string(),
420                DataType::Integer { .. } => "INTEGER".to_string(),
421                DataType::Float { .. } => "REAL".to_string(),
422                DataType::Boolean => "INTEGER".to_string(), // SQLite uses INTEGER for BOOLEAN
423                DataType::DateTime { .. } => "TEXT".to_string(), // SQLite stores datetime as TEXT
424                DataType::Date { .. } => "TEXT".to_string(),
425                DataType::Email => "TEXT".to_string(),
426                DataType::Phone { .. } => "TEXT".to_string(),
427                _ => "TEXT".to_string(),
428            };
429            
430            let mut def = format!("{} {}", field.name, data_type);
431            
432            if !field.constraints.nullable {
433                def.push_str(" NOT NULL");
434            }
435            
436            if field.constraints.unique {
437                def.push_str(" UNIQUE");
438            }
439            
440            def
441        }).collect();
442        
443        sql.push_str(&field_definitions.join(", "));
444        
445        if let Some(ref pk) = schema.primary_key {
446            sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
447        }
448        
449        sql.push_str(")");
450        Ok(sql)
451    }
452}
453
454impl DatabaseFiller for FillingExecutor {
455    fn create_table(&mut self, schema: &TableSchema) -> Result<()> {
456        // 根据连接配置的数据库类型执行真正的创建表操作
457        match &self.connection_manager.database_type() {
458            crate::filling::DatabaseType::MySQL => {
459                let sql = self.generate_create_table_sql_mysql(schema)?;
460                self.execute_sql(&sql)
461            },
462            crate::filling::DatabaseType::PostgreSQL => {
463                let sql = self.generate_create_table_sql_postgres(schema)?;
464                self.execute_sql(&sql)
465            },
466            crate::filling::DatabaseType::SQLite => {
467                let sql = self.generate_create_table_sql_sqlite(schema)?;
468                self.execute_sql(&sql)
469            },
470        }
471    }
472
473    fn insert_data(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
474        if data.is_empty() {
475            return Ok(());
476        }
477
478        // 生成SQL语句并执行真正的插入操作
479        let sql = self.generate_insert_sql(table_name, &data)?;
480        self.execute_sql(&sql)?;
481        
482        self.stats.successful_rows += data.len();
483        Ok(())
484    }
485
486    fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
487        let chunks: Vec<_> = data.chunks(self.config.batch_size).collect();
488        
489        for chunk in chunks {
490            let sql = self.generate_insert_sql(table_name, &chunk.to_vec())?;
491            self.execute_sql(&sql)?;
492            self.stats.successful_rows += chunk.len();
493        }
494        
495        Ok(())
496    }
497
498    fn get_stats(&self) -> FillingStats {
499        self.stats.clone()
500    }
501
502    fn truncate_table(&mut self, table_name: &str) -> Result<()> {
503        // 执行真正的 TRUNCATE 操作
504        let sql = format!("TRUNCATE TABLE {}", table_name);
505        self.execute_sql(&sql)
506    }
507
508    fn drop_table(&mut self, table_name: &str) -> Result<()> {
509        // 执行真正的 DROP TABLE 操作
510        let sql = format!("DROP TABLE IF EXISTS {}", table_name);
511        self.execute_sql(&sql)
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::filling::ConnectionConfig;
519    use crate::db::schema::{FieldSchema, FieldConstraints, DataType};
520
521    fn create_test_schema() -> TableSchema {
522        TableSchema {
523            name: "test_table".to_string(),
524            fields: vec![
525                FieldSchema {
526                    name: "id".to_string(),
527                    data_type: DataType::Integer { min: None, max: None },
528                    constraints: FieldConstraints {
529                        nullable: false,
530                        unique: true,
531                        default: None,
532                        pattern: None,
533                        min: None,
534                        max: None,
535                    },
536                    description: None,
537                    generator_config: None,
538                    generator_type: None,
539                },
540                FieldSchema {
541                    name: "name".to_string(),
542                    data_type: DataType::String { max_length: Some(100) },
543                    constraints: FieldConstraints {
544                        nullable: false,
545                        unique: false,
546                        default: None,
547                        pattern: None,
548                        min: None,
549                        max: None,
550                    },
551                    description: None,
552                    generator_config: None,
553                    generator_type: None,
554                },
555            ],
556            primary_key: Some(vec!["id".to_string()]),
557            indexes: vec![],
558            description: None,
559        }
560    }
561
562    fn create_test_data() -> Vec<HashMap<String, Value>> {
563        vec![
564            {
565                let mut row = HashMap::new();
566                row.insert("id".to_string(), Value::Number(serde_json::Number::from(1)));
567                row.insert("name".to_string(), Value::String("Alice".to_string()));
568                row
569            },
570            {
571                let mut row = HashMap::new();
572                row.insert("id".to_string(), Value::Number(serde_json::Number::from(2)));
573                row.insert("name".to_string(), Value::String("Bob".to_string()));
574                row
575            },
576        ]
577    }
578
579    #[test]
580    fn test_filling_executor_creation() {
581        let config = ConnectionConfig::sqlite("test.db");
582        let connection_manager = ConnectionManager::new(config).unwrap();
583        let filling_config = FillingConfig::default();
584        
585        let executor = FillingExecutor::new(connection_manager, filling_config);
586        assert_eq!(executor.stats.total_rows, 0);
587    }
588
589    #[test]
590    fn test_validate_data() {
591        let config = ConnectionConfig::sqlite("test.db");
592        let connection_manager = ConnectionManager::new(config).unwrap();
593        let filling_config = FillingConfig::default();
594        let executor = FillingExecutor::new(connection_manager, filling_config);
595        
596        let schema = create_test_schema();
597        let data = create_test_data();
598        
599        let errors = executor.validate_data(&data, &schema).unwrap();
600        assert!(errors.is_empty());
601    }
602
603    #[test]
604    fn test_generate_insert_sql() {
605        let config = ConnectionConfig::sqlite("test.db");
606        let connection_manager = ConnectionManager::new(config).unwrap();
607        let filling_config = FillingConfig::default();
608        let executor = FillingExecutor::new(connection_manager, filling_config);
609        
610        let data = create_test_data();
611        let sql = executor.generate_insert_sql("test_table", &data).unwrap();
612        
613        assert!(sql.contains("INSERT INTO test_table"));
614        assert!(sql.contains("VALUES"));
615        assert!(sql.contains("Alice"));
616        assert!(sql.contains("Bob"));
617    }
618}