1use std::collections::HashMap;
4use std::time::{Duration, Instant};
5use serde_json::Value;
6use crate::error::{DataForgeError, Result};
7use crate::db::schema::TableSchema;
8use super::{DatabaseFiller, FillingConfig, FillingStats, ConnectionManager};
9
10pub struct FillingExecutor {
12 connection_manager: ConnectionManager,
13 config: FillingConfig,
14 stats: FillingStats,
15}
16
17impl FillingExecutor {
18 pub fn new(connection_manager: ConnectionManager, config: FillingConfig) -> Self {
20 Self {
21 connection_manager,
22 config,
23 stats: FillingStats::default(),
24 }
25 }
26
27 pub fn execute_filling(&mut self, schema: &TableSchema, data: Vec<HashMap<String, Value>>) -> Result<()> {
29 let start_time = Instant::now();
30
31 self.stats = FillingStats::default();
33 self.stats.total_rows = data.len();
34
35 self.create_table(schema)?;
37
38 self.batch_insert(&schema.name, data)?;
40
41 self.stats.processing_time_ms = start_time.elapsed().as_millis() as u64;
43 self.stats.calculate_speed();
44
45 Ok(())
46 }
47
48 pub fn execute_with_retry(&mut self, schema: &TableSchema, data: Vec<HashMap<String, Value>>) -> Result<()> {
50 let mut last_error = None;
51
52 for attempt in 0..=self.config.retry_count {
53 match self.execute_filling(schema, data.clone()) {
54 Ok(()) => return Ok(()),
55 Err(e) => {
56 last_error = Some(e);
57 if attempt < self.config.retry_count {
58 std::thread::sleep(Duration::from_millis(1000 * (attempt + 1) as u64));
60 }
61 }
62 }
63 }
64
65 Err(last_error.unwrap_or_else(|| DataForgeError::generator("Unknown error during retry")))
66 }
67
68 pub fn execute_chunked(&mut self, schema: &TableSchema, data: Vec<HashMap<String, Value>>) -> Result<()> {
70 let start_time = Instant::now();
71
72 self.stats = FillingStats::default();
74 self.stats.total_rows = data.len();
75
76 self.create_table(schema)?;
78
79 let chunks: Vec<_> = data.chunks(self.config.batch_size).collect();
81
82 for (i, chunk) in chunks.iter().enumerate() {
83 match self.insert_data(&schema.name, chunk.to_vec()) {
84 Ok(()) => {
85 self.stats.successful_rows += chunk.len();
86 }
87 Err(e) => {
88 self.stats.failed_rows += chunk.len();
89 eprintln!("Failed to insert chunk {}: {}", i, e);
90 }
91 }
92 }
93
94 self.stats.processing_time_ms = start_time.elapsed().as_millis() as u64;
96 self.stats.calculate_speed();
97
98 Ok(())
99 }
100
101 pub fn validate_data(&self, data: &[HashMap<String, Value>], schema: &TableSchema) -> Result<Vec<String>> {
103 let mut errors = Vec::new();
104
105 for (index, row) in data.iter().enumerate() {
106 for field in &schema.fields {
108 if !field.constraints.nullable && !row.contains_key(&field.name) {
109 errors.push(format!("Row {}: Missing required field '{}'", index, field.name));
110 }
111 }
112
113 for field in &schema.fields {
115 if field.constraints.unique {
116 if let Some(value) = row.get(&field.name) {
117 if value.is_null() && !field.constraints.nullable {
118 errors.push(format!("Row {}: Unique field '{}' cannot be null", index, field.name));
119 }
120 }
121 }
122 }
123
124 for (field_name, value) in row {
126 if let Some(field) = schema.fields.iter().find(|f| f.name == *field_name) {
127 if let Err(e) = self.validate_field_value(value, field) {
128 errors.push(format!("Row {}: Field '{}' - {}", index, field_name, e));
129 }
130 }
131 }
132 }
133
134 Ok(errors)
135 }
136
137 fn validate_field_value(&self, value: &Value, field: &crate::db::schema::FieldSchema) -> Result<()> {
139 use crate::db::schema::DataType;
140
141 match (&field.data_type, value) {
142 (DataType::String { max_length }, Value::String(s)) => {
143 if let Some(max_len) = max_length {
144 if s.len() > *max_len {
145 return Err(DataForgeError::validation(&format!(
146 "String length {} exceeds maximum {}", s.len(), max_len
147 )));
148 }
149 }
150 },
151 (DataType::Integer { min, max }, Value::Number(n)) => {
152 if let Some(i) = n.as_i64() {
153 if let Some(min_val) = min {
154 if i < *min_val {
155 return Err(DataForgeError::validation(&format!(
156 "Integer {} is less than minimum {}", i, min_val
157 )));
158 }
159 }
160 if let Some(max_val) = max {
161 if i > *max_val {
162 return Err(DataForgeError::validation(&format!(
163 "Integer {} exceeds maximum {}", i, max_val
164 )));
165 }
166 }
167 } else {
168 return Err(DataForgeError::validation("Expected integer value"));
169 }
170 },
171 (DataType::Email, Value::String(s)) => {
172 if !s.contains('@') || !s.contains('.') {
173 return Err(DataForgeError::validation("Invalid email format"));
174 }
175 },
176 (DataType::Phone { .. }, Value::String(s)) => {
177 if s.is_empty() || s.len() > 20 {
178 return Err(DataForgeError::validation("Invalid phone number format"));
179 }
180 },
181 _ => {} }
183
184 Ok(())
185 }
186
187 fn generate_insert_sql(&self, table_name: &str, data: &[HashMap<String, Value>]) -> Result<String> {
189 if data.is_empty() {
190 return Err(DataForgeError::validation("No data to insert"));
191 }
192
193 let first_row = &data[0];
194 let columns: Vec<&String> = first_row.keys().collect();
195
196 let mut sql = format!("INSERT INTO {} (", table_name);
197 sql.push_str(&columns.iter().map(|c| c.as_str()).collect::<Vec<_>>().join(", "));
198 sql.push_str(") VALUES ");
199
200 let value_clauses: Vec<String> = data.iter().map(|row| {
201 let values: Vec<String> = columns.iter().map(|col| {
202 match row.get(*col) {
203 Some(Value::String(s)) => format!("'{}'", s.replace("'", "''")),
204 Some(Value::Number(n)) => n.to_string(),
205 Some(Value::Bool(b)) => if *b { "TRUE" } else { "FALSE" }.to_string(),
206 Some(Value::Null) | None => "NULL".to_string(),
207 Some(other) => format!("'{}'", other.to_string().replace("'", "''")),
208 }
209 }).collect();
210 format!("({})", values.join(", "))
211 }).collect();
212
213 sql.push_str(&value_clauses.join(", "));
214 Ok(sql)
215 }
216
217 fn execute_sql(&self, sql: &str) -> Result<()> {
219 match &self.connection_manager.database_type() {
221 crate::filling::DatabaseType::MySQL => {
222 self.execute_mysql_sql(sql)
223 },
224 crate::filling::DatabaseType::PostgreSQL => {
225 self.execute_postgres_sql(sql)
226 },
227 crate::filling::DatabaseType::SQLite => {
228 self.execute_sqlite_sql(sql)
229 },
230 }
231 }
232
233 fn execute_mysql_sql(&self, sql: &str) -> Result<()> {
235 #[cfg(feature = "database")]
236 {
237 let connection_string = self.connection_manager.get_connection_string()?;
238 let rt = tokio::runtime::Runtime::new()
239 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
240
241 rt.block_on(async {
242 let pool = sqlx::mysql::MySqlPoolOptions::new()
243 .max_connections(1)
244 .connect(&connection_string)
245 .await
246 .map_err(|e| DataForgeError::database(&format!("Failed to connect to MySQL: {}", e)))?;
247
248 sqlx::query(sql)
249 .execute(&pool)
250 .await
251 .map_err(|e| DataForgeError::database(&format!("Failed to execute SQL: {}", e)))?;
252
253 pool.close().await;
254 Ok(())
255 })
256 }
257 #[cfg(not(feature = "database"))]
258 {
259 Err(DataForgeError::database("Database feature not enabled"))
260 }
261 }
262
263 fn execute_postgres_sql(&self, sql: &str) -> Result<()> {
265 #[cfg(feature = "database")]
266 {
267 let connection_string = self.connection_manager.get_connection_string()?;
268 let rt = tokio::runtime::Runtime::new()
269 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
270
271 rt.block_on(async {
272 let pool = sqlx::postgres::PgPoolOptions::new()
273 .max_connections(1)
274 .connect(&connection_string)
275 .await
276 .map_err(|e| DataForgeError::database(&format!("Failed to connect to PostgreSQL: {}", e)))?;
277
278 sqlx::query(sql)
279 .execute(&pool)
280 .await
281 .map_err(|e| DataForgeError::database(&format!("Failed to execute SQL: {}", e)))?;
282
283 pool.close().await;
284 Ok(())
285 })
286 }
287 #[cfg(not(feature = "database"))]
288 {
289 Err(DataForgeError::database("Database feature not enabled"))
290 }
291 }
292
293 fn execute_sqlite_sql(&self, sql: &str) -> Result<()> {
295 #[cfg(feature = "database")]
296 {
297 let connection_string = self.connection_manager.get_connection_string()?;
298 let rt = tokio::runtime::Runtime::new()
299 .map_err(|e| DataForgeError::database(&format!("Failed to create runtime: {}", e)))?;
300
301 rt.block_on(async {
302 let pool = sqlx::sqlite::SqlitePoolOptions::new()
303 .max_connections(1)
304 .connect(&connection_string)
305 .await
306 .map_err(|e| DataForgeError::database(&format!("Failed to connect to SQLite: {}", e)))?;
307
308 sqlx::query(sql)
309 .execute(&pool)
310 .await
311 .map_err(|e| DataForgeError::database(&format!("Failed to execute SQL: {}", e)))?;
312
313 pool.close().await;
314 Ok(())
315 })
316 }
317 #[cfg(not(feature = "database"))]
318 {
319 Err(DataForgeError::database("Database feature not enabled"))
320 }
321 }
322
323 fn generate_create_table_sql_mysql(&self, schema: &TableSchema) -> Result<String> {
325 use crate::db::schema::DataType;
326
327 let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
328
329 let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
330 let data_type = match &field.data_type {
331 DataType::String { max_length } => {
332 format!("VARCHAR({})", max_length.unwrap_or(255))
333 },
334 DataType::Integer { .. } => "INT".to_string(),
335 DataType::Float { .. } => "FLOAT".to_string(),
336 DataType::Boolean => "BOOLEAN".to_string(),
337 DataType::DateTime { .. } => "DATETIME".to_string(),
338 DataType::Date { .. } => "DATE".to_string(),
339 DataType::Email => "VARCHAR(255)".to_string(),
340 DataType::Phone { .. } => "VARCHAR(20)".to_string(),
341 _ => "TEXT".to_string(),
342 };
343
344 let mut def = format!("{} {}", field.name, data_type);
345
346 if !field.constraints.nullable {
347 def.push_str(" NOT NULL");
348 }
349
350 if field.constraints.unique {
351 def.push_str(" UNIQUE");
352 }
353
354 def
355 }).collect();
356
357 sql.push_str(&field_definitions.join(", "));
358
359 if let Some(ref pk) = schema.primary_key {
360 sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
361 }
362
363 sql.push_str(")");
364 Ok(sql)
365 }
366
367 fn generate_create_table_sql_postgres(&self, schema: &TableSchema) -> Result<String> {
369 use crate::db::schema::DataType;
370
371 let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
372
373 let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
374 let data_type = match &field.data_type {
375 DataType::String { max_length } => {
376 format!("VARCHAR({})", max_length.unwrap_or(255))
377 },
378 DataType::Integer { .. } => "INTEGER".to_string(),
379 DataType::Float { .. } => "REAL".to_string(),
380 DataType::Boolean => "BOOLEAN".to_string(),
381 DataType::DateTime { .. } => "TIMESTAMP".to_string(),
382 DataType::Date { .. } => "DATE".to_string(),
383 DataType::Email => "VARCHAR(255)".to_string(),
384 DataType::Phone { .. } => "VARCHAR(20)".to_string(),
385 _ => "TEXT".to_string(),
386 };
387
388 let mut def = format!("{} {}", field.name, data_type);
389
390 if !field.constraints.nullable {
391 def.push_str(" NOT NULL");
392 }
393
394 if field.constraints.unique {
395 def.push_str(" UNIQUE");
396 }
397
398 def
399 }).collect();
400
401 sql.push_str(&field_definitions.join(", "));
402
403 if let Some(ref pk) = schema.primary_key {
404 sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
405 }
406
407 sql.push_str(")");
408 Ok(sql)
409 }
410
411 fn generate_create_table_sql_sqlite(&self, schema: &TableSchema) -> Result<String> {
413 use crate::db::schema::DataType;
414
415 let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", schema.name);
416
417 let field_definitions: Vec<String> = schema.fields.iter().map(|field| {
418 let data_type = match &field.data_type {
419 DataType::String { .. } => "TEXT".to_string(),
420 DataType::Integer { .. } => "INTEGER".to_string(),
421 DataType::Float { .. } => "REAL".to_string(),
422 DataType::Boolean => "INTEGER".to_string(), DataType::DateTime { .. } => "TEXT".to_string(), DataType::Date { .. } => "TEXT".to_string(),
425 DataType::Email => "TEXT".to_string(),
426 DataType::Phone { .. } => "TEXT".to_string(),
427 _ => "TEXT".to_string(),
428 };
429
430 let mut def = format!("{} {}", field.name, data_type);
431
432 if !field.constraints.nullable {
433 def.push_str(" NOT NULL");
434 }
435
436 if field.constraints.unique {
437 def.push_str(" UNIQUE");
438 }
439
440 def
441 }).collect();
442
443 sql.push_str(&field_definitions.join(", "));
444
445 if let Some(ref pk) = schema.primary_key {
446 sql.push_str(&format!(", PRIMARY KEY ({})", pk.join(", ")));
447 }
448
449 sql.push_str(")");
450 Ok(sql)
451 }
452}
453
454impl DatabaseFiller for FillingExecutor {
455 fn create_table(&mut self, schema: &TableSchema) -> Result<()> {
456 match &self.connection_manager.database_type() {
458 crate::filling::DatabaseType::MySQL => {
459 let sql = self.generate_create_table_sql_mysql(schema)?;
460 self.execute_sql(&sql)
461 },
462 crate::filling::DatabaseType::PostgreSQL => {
463 let sql = self.generate_create_table_sql_postgres(schema)?;
464 self.execute_sql(&sql)
465 },
466 crate::filling::DatabaseType::SQLite => {
467 let sql = self.generate_create_table_sql_sqlite(schema)?;
468 self.execute_sql(&sql)
469 },
470 }
471 }
472
473 fn insert_data(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
474 if data.is_empty() {
475 return Ok(());
476 }
477
478 let sql = self.generate_insert_sql(table_name, &data)?;
480 self.execute_sql(&sql)?;
481
482 self.stats.successful_rows += data.len();
483 Ok(())
484 }
485
486 fn batch_insert(&mut self, table_name: &str, data: Vec<HashMap<String, Value>>) -> Result<()> {
487 let chunks: Vec<_> = data.chunks(self.config.batch_size).collect();
488
489 for chunk in chunks {
490 let sql = self.generate_insert_sql(table_name, &chunk.to_vec())?;
491 self.execute_sql(&sql)?;
492 self.stats.successful_rows += chunk.len();
493 }
494
495 Ok(())
496 }
497
498 fn get_stats(&self) -> FillingStats {
499 self.stats.clone()
500 }
501
502 fn truncate_table(&mut self, table_name: &str) -> Result<()> {
503 let sql = format!("TRUNCATE TABLE {}", table_name);
505 self.execute_sql(&sql)
506 }
507
508 fn drop_table(&mut self, table_name: &str) -> Result<()> {
509 let sql = format!("DROP TABLE IF EXISTS {}", table_name);
511 self.execute_sql(&sql)
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::filling::ConnectionConfig;
519 use crate::db::schema::{FieldSchema, FieldConstraints, DataType};
520
521 fn create_test_schema() -> TableSchema {
522 TableSchema {
523 name: "test_table".to_string(),
524 fields: vec![
525 FieldSchema {
526 name: "id".to_string(),
527 data_type: DataType::Integer { min: None, max: None },
528 constraints: FieldConstraints {
529 nullable: false,
530 unique: true,
531 default: None,
532 pattern: None,
533 min: None,
534 max: None,
535 },
536 description: None,
537 generator_config: None,
538 generator_type: None,
539 },
540 FieldSchema {
541 name: "name".to_string(),
542 data_type: DataType::String { max_length: Some(100) },
543 constraints: FieldConstraints {
544 nullable: false,
545 unique: false,
546 default: None,
547 pattern: None,
548 min: None,
549 max: None,
550 },
551 description: None,
552 generator_config: None,
553 generator_type: None,
554 },
555 ],
556 primary_key: Some(vec!["id".to_string()]),
557 indexes: vec![],
558 description: None,
559 }
560 }
561
562 fn create_test_data() -> Vec<HashMap<String, Value>> {
563 vec![
564 {
565 let mut row = HashMap::new();
566 row.insert("id".to_string(), Value::Number(serde_json::Number::from(1)));
567 row.insert("name".to_string(), Value::String("Alice".to_string()));
568 row
569 },
570 {
571 let mut row = HashMap::new();
572 row.insert("id".to_string(), Value::Number(serde_json::Number::from(2)));
573 row.insert("name".to_string(), Value::String("Bob".to_string()));
574 row
575 },
576 ]
577 }
578
579 #[test]
580 fn test_filling_executor_creation() {
581 let config = ConnectionConfig::sqlite("test.db");
582 let connection_manager = ConnectionManager::new(config).unwrap();
583 let filling_config = FillingConfig::default();
584
585 let executor = FillingExecutor::new(connection_manager, filling_config);
586 assert_eq!(executor.stats.total_rows, 0);
587 }
588
589 #[test]
590 fn test_validate_data() {
591 let config = ConnectionConfig::sqlite("test.db");
592 let connection_manager = ConnectionManager::new(config).unwrap();
593 let filling_config = FillingConfig::default();
594 let executor = FillingExecutor::new(connection_manager, filling_config);
595
596 let schema = create_test_schema();
597 let data = create_test_data();
598
599 let errors = executor.validate_data(&data, &schema).unwrap();
600 assert!(errors.is_empty());
601 }
602
603 #[test]
604 fn test_generate_insert_sql() {
605 let config = ConnectionConfig::sqlite("test.db");
606 let connection_manager = ConnectionManager::new(config).unwrap();
607 let filling_config = FillingConfig::default();
608 let executor = FillingExecutor::new(connection_manager, filling_config);
609
610 let data = create_test_data();
611 let sql = executor.generate_insert_sql("test_table", &data).unwrap();
612
613 assert!(sql.contains("INSERT INTO test_table"));
614 assert!(sql.contains("VALUES"));
615 assert!(sql.contains("Alice"));
616 assert!(sql.contains("Bob"));
617 }
618}