dataforge/filling/
mod.rs

1//! 数据库填充模块
2//! 
3//! 自动化数据库表的创建和数据填充,支持Schema推断和自动匹配
4
5use std::collections::HashMap;
6use serde_json::Value;
7use crate::error::{DataForgeError, Result};
8
9pub mod connection;
10pub mod filling;
11pub mod batch;
12pub mod mysql;
13pub mod generators;
14pub mod utils;
15
16pub use connection::*;
17pub use filling::*;
18pub use batch::*;
19pub use mysql::*;
20pub use utils::*;
21
22use crate::db::schema::{DataType, TableSchema};
23
24/// 数据库填充配置
25#[derive(Debug, Clone)]
26pub struct FillingConfig {
27    /// 批量插入大小
28    pub batch_size: usize,
29    /// 是否启用事务
30    pub use_transaction: bool,
31    /// 超时时间(秒)
32    pub timeout_seconds: u64,
33    /// 重试次数
34    pub retry_count: usize,
35}
36
37impl Default for FillingConfig {
38    fn default() -> Self {
39        Self {
40            batch_size: 1000,
41            use_transaction: true,
42            timeout_seconds: 30,
43            retry_count: 3,
44        }
45    }
46}
47
48/// 数据库填充统计
49#[derive(Debug, Clone, Default)]
50pub struct FillingStats {
51    /// 总插入行数
52    pub total_rows: usize,
53    /// 成功插入行数
54    pub successful_rows: usize,
55    /// 失败插入行数
56    pub failed_rows: usize,
57    /// 处理时间(毫秒)
58    pub processing_time_ms: u64,
59    /// 平均插入速度(行/秒)
60    pub rows_per_second: f64,
61}
62
63impl FillingStats {
64    /// 计算插入速度
65    pub fn calculate_speed(&mut self) {
66        if self.processing_time_ms > 0 {
67            self.rows_per_second = (self.successful_rows as f64 * 1000.0) / self.processing_time_ms as f64;
68        }
69    }
70}
71
72/// 数据库填充器接口
73pub trait DatabaseFiller {
74    /// 创建表
75    fn create_table(&mut self, schema: &TableSchema) -> Result<()>;
76    
77    /// 插入数据
78    fn insert_data(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()>;
79    
80    /// 批量插入数据
81    fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()>;
82    
83    /// 获取统计信息
84    fn get_stats(&self) -> FillingStats;
85    
86    /// 清空表
87    fn truncate_table(&mut self, table_name: &str) -> Result<()>;
88    
89    /// 删除表
90    fn drop_table(&mut self, table_name: &str) -> Result<()>;
91}
92
93/// 通用数据库填充器
94pub struct GenericFiller {
95    #[allow(dead_code)]
96    config: FillingConfig,
97    #[allow(dead_code)]
98    stats: FillingStats,
99    #[allow(dead_code)]
100    connection_string: String,
101}
102
103impl GenericFiller {
104    /// 创建新的填充器
105    pub fn new(connection_string: String, config: FillingConfig) -> Self {
106        Self {
107            config,
108            stats: FillingStats::default(),
109            connection_string,
110        }
111    }
112
113    /// 验证数据类型
114    fn validate_data_type(&self, value: &Value, data_type: &DataType) -> Result<()> {
115        match (value, data_type) {
116            (Value::String(_), DataType::String { .. }) => Ok(()),
117            (Value::Number(n), DataType::Integer { .. }) if n.is_i64() => Ok(()),
118            (Value::Number(n), DataType::Float { .. }) if n.is_f64() => Ok(()),
119            (Value::Bool(_), DataType::Boolean) => Ok(()),
120            (Value::String(_), DataType::DateTime { .. }) => Ok(()),
121            (Value::String(_), DataType::Date { .. }) => Ok(()),
122            (Value::String(_), DataType::Time { .. }) => Ok(()),
123            (Value::String(_), DataType::Uuid) => Ok(()),
124            (Value::String(_), DataType::Email) => Ok(()),
125            (Value::String(_), DataType::Phone { .. }) => Ok(()),
126            (Value::String(_), DataType::Url) => Ok(()),
127            (_, DataType::Json) => Ok(()),
128            _ => Err(DataForgeError::validation(&format!(
129                "Data type mismatch: expected {:?}, got {:?}", 
130                data_type, value
131            ))),
132        }
133    }
134
135    /// 验证数据行
136    pub fn validate_row(&self, row: &HashMap<String, Value>, schema: &TableSchema) -> Result<()> {
137        for field in &schema.fields {
138            if let Some(value) = row.get(&field.name) {
139                self.validate_data_type(value, &field.data_type)?;
140            } else if !field.constraints.nullable {
141                return Err(DataForgeError::validation(&format!(
142                    "Required field '{}' is missing", field.name
143                )));
144            }
145        }
146        Ok(())
147    }
148
149    /// 生成SQL创建表语句
150    pub fn generate_create_table_sql(&self, schema: &TableSchema) -> String {
151        let mut sql = format!("CREATE TABLE {} (\n", schema.name);
152        
153        let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
154            let mut def = format!("  {} {}", field.name, self.data_type_to_sql(&field.data_type));
155            
156            if !field.constraints.nullable {
157                def.push_str(" NOT NULL");
158            }
159            
160            if field.constraints.unique {
161                def.push_str(" UNIQUE");
162            }
163            
164            if let Some(default) = &field.constraints.default {
165                def.push_str(&format!(" DEFAULT {}", self.value_to_sql(default)));
166            }
167            
168            def
169        }).collect();
170        
171        sql.push_str(&field_definitions.join(",\n"));
172        
173        if let Some(pk) = &schema.primary_key {
174            sql.push_str(&format!(",\n  PRIMARY KEY ({})", pk.join(", ")));
175        }
176        
177        sql.push_str("\n)");
178        sql
179    }
180
181    /// 数据类型转SQL类型
182    fn data_type_to_sql(&self, data_type: &DataType) -> &'static str {
183        match data_type {
184            DataType::String { max_length } => {
185                if let Some(len) = max_length {
186                    if *len <= 255 {
187                        "VARCHAR(255)"
188                    } else {
189                        "TEXT"
190                    }
191                } else {
192                    "TEXT"
193                }
194            },
195            DataType::Integer { .. } => "INTEGER",
196            DataType::Float { .. } => "REAL",
197            DataType::Boolean => "BOOLEAN",
198            DataType::DateTime { .. } => "DATETIME",
199            DataType::Date { .. } => "DATE",
200            DataType::Time { .. } => "TIME",
201            DataType::Uuid => "VARCHAR(36)",
202            DataType::Email => "VARCHAR(255)",
203            DataType::Phone { .. } => "VARCHAR(20)",
204            DataType::Url => "VARCHAR(2048)",
205            DataType::Json => "JSON",
206            DataType::Enum { .. } => "VARCHAR(50)",
207            DataType::Array { .. } => "JSON",
208            DataType::Object { .. } => "JSON",
209            DataType::Custom { .. } => "TEXT",
210        }
211    }
212
213    /// 值转SQL字符串
214    fn value_to_sql(&self, value: &Value) -> String {
215        match value {
216            Value::String(s) => format!("'{}'", s.replace("'", "''")),
217            Value::Number(n) => n.to_string(),
218            Value::Bool(b) => if *b { "TRUE" } else { "FALSE" }.to_string(),
219            Value::Null => "NULL".to_string(),
220            _ => format!("'{}'", value.to_string().replace("'", "''")),
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use crate::db::schema::FieldConstraints;
228    use crate::FieldSchema;
229    use super::*;
230
231    #[test]
232    fn test_filling_config_default() {
233        let config = FillingConfig::default();
234        assert_eq!(config.batch_size, 1000);
235        assert!(config.use_transaction);
236        assert_eq!(config.timeout_seconds, 30);
237        assert_eq!(config.retry_count, 3);
238    }
239
240    #[test]
241    fn test_filling_stats() {
242        let mut stats = FillingStats {
243            total_rows: 1000,
244            successful_rows: 950,
245            failed_rows: 50,
246            processing_time_ms: 1000,
247            rows_per_second: 0.0,
248        };
249        
250        stats.calculate_speed();
251        assert_eq!(stats.rows_per_second, 950.0);
252    }
253
254    #[test]
255    fn test_validate_data_type() {
256        let filler = GenericFiller::new("test".to_string(), FillingConfig::default());
257        
258        let string_value = Value::String("test".to_string());
259        let string_type = DataType::String { max_length: Some(255) };
260        assert!(filler.validate_data_type(&string_value, &string_type).is_ok());
261        
262        let number_value = Value::Number(serde_json::Number::from(42));
263        let int_type = DataType::Integer { min: None, max: None };
264        assert!(filler.validate_data_type(&number_value, &int_type).is_ok());
265    }
266
267    #[test]
268    fn test_generate_create_table_sql() {
269        let filler = GenericFiller::new("test".to_string(), FillingConfig::default());
270        
271        let schema = TableSchema {
272            name: "users".to_string(),
273            fields: vec![
274                FieldSchema {
275                    name: "id".to_string(),
276                    data_type: DataType::Integer { min: None, max: None },
277                    constraints: FieldConstraints {
278                        nullable: false,
279                        unique: true,
280                        default: None,
281                        pattern: None,
282                        min: None,
283                        max: None,
284                    },
285                    description: None,
286                    generator_config: None,
287                    generator_type: None,
288                },
289                FieldSchema {
290                    name: "name".to_string(),
291                    data_type: DataType::String { max_length: Some(100) },
292                    constraints: FieldConstraints {
293                        nullable: false,
294                        unique: false,
295                        default: None,
296                        pattern: None,
297                        min: None,
298                        max: None,
299                    },
300                    description: None,
301                    generator_config: None,
302                    generator_type: None,
303                },
304            ],
305            primary_key: Some(vec!["id".to_string()]),
306            indexes: vec![],
307            description: None,
308        };
309        
310        let sql = filler.generate_create_table_sql(&schema);
311        assert!(sql.contains("CREATE TABLE users"));
312        assert!(sql.contains("id INTEGER NOT NULL UNIQUE"));
313        assert!(sql.contains("name VARCHAR(255) NOT NULL"));
314        assert!(sql.contains("PRIMARY KEY (id)"));
315    }
316}