1use anyhow::{Context, Result};
16use chrono::{Duration as ChronoDuration, Utc};
17use rand::rngs::StdRng;
18use rand::{Rng, SeedableRng};
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::path::Path;
22use std::time::{Duration, Instant};
23use uuid::Uuid;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PopulationConfig {
31 pub table_name: String,
32 pub row_count: usize,
33 pub batch_size: usize,
34 pub seed: Option<u64>,
35 pub columns: Vec<ColumnConfig>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ColumnConfig {
41 pub name: String,
42 pub data_type: DataType,
43 pub distribution: DataDistribution,
44 pub nullable: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum DataType {
50 Integer,
51 Text,
52 Real,
53 Boolean,
54 Date,
55 Timestamp,
56 UUID,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum DataDistribution {
62 Uniform,
63 Normal { mean: f64, std_dev: f64 },
64 Sequential,
65 Random,
66 Custom(Vec<String>),
67}
68
69impl Default for PopulationConfig {
70 fn default() -> Self {
71 Self {
72 table_name: "large_table".to_string(),
73 row_count: 1_000_000,
74 batch_size: 10_000,
75 seed: None,
76 columns: vec![
77 ColumnConfig {
78 name: "id".to_string(),
79 data_type: DataType::Integer,
80 distribution: DataDistribution::Sequential,
81 nullable: false,
82 },
83 ColumnConfig {
84 name: "text_col".to_string(),
85 data_type: DataType::Text,
86 distribution: DataDistribution::Random,
87 nullable: true,
88 },
89 ColumnConfig {
90 name: "value".to_string(),
91 data_type: DataType::Real,
92 distribution: DataDistribution::Normal {
93 mean: 100.0,
94 std_dev: 15.0,
95 },
96 nullable: false,
97 },
98 ],
99 }
100 }
101}
102
103pub fn populate_database(db_path: &str, config: Option<PopulationConfig>) -> Result<()> {
125 println!("Connecting to database: {}", db_path);
126
127 validate_database_for_population(db_path)?;
129
130 let mut conn = create_connection_with_settings(db_path)?;
131
132 check_disk_space_requirements(db_path, &config)?;
134
135 let config = config.unwrap_or_default();
136 println!("Creating table '{}'...", config.table_name);
137 create_table_with_config(&conn, &config)?;
138
139 println!("Populating table with {} rows...", config.row_count);
140 println!(
141 "This may take a while. Progress will be shown every {} rows.",
142 config.batch_size
143 );
144
145 let start_time = Instant::now();
146
147 let result = populate_with_transaction(&mut conn, &config);
149
150 match result {
151 Ok(rows_inserted) => {
152 let duration = start_time.elapsed();
153 println!(
154 "Successfully populated table '{}' with {} rows",
155 config.table_name, rows_inserted
156 );
157 println!("Total time: {:.2} seconds", duration.as_secs_f64());
158 println!(
159 "Average: {:.0} rows/second",
160 rows_inserted as f64 / duration.as_secs_f64()
161 );
162 }
163 Err(e) => {
164 eprintln!("Population failed: {}", e);
165 eprintln!("Attempting to rollback any partial changes...");
166
167 if let Err(cleanup_err) = cleanup_failed_population(&conn, &config.table_name) {
169 eprintln!("Warning: Cleanup failed: {}", cleanup_err);
170 eprintln!("You may need to manually drop the table if it was partially created.");
171 } else {
172 println!("Cleanup completed successfully");
173 }
174
175 return Err(e);
176 }
177 }
178
179 verify_population_success(&conn, &config)?;
181
182 Ok(())
183}
184
185fn validate_database_for_population(db_path: &str) -> Result<()> {
186 if !Path::new(db_path).exists() {
187 anyhow::bail!(
188 "Database '{}' does not exist. Create it first with 'init' command.",
189 db_path
190 );
191 }
192
193 let metadata = std::fs::metadata(db_path)
194 .with_context(|| format!("Cannot read database file '{}'", db_path))?;
195
196 if metadata.is_dir() {
197 anyhow::bail!("'{}' is a directory, not a database file", db_path);
198 }
199
200 Ok(())
201}
202
203fn create_connection_with_settings(db_path: &str) -> Result<Connection> {
204 let conn = Connection::open(db_path)
205 .with_context(|| format!("Failed to connect to database: {}", db_path))?;
206
207 conn.pragma_update(None, "synchronous", "OFF")
209 .context("Failed to disable synchronous mode")?;
210
211 conn.pragma_update(None, "journal_mode", "MEMORY")
212 .context("Failed to set journal mode to memory")?;
213
214 conn.pragma_update(None, "cache_size", "10000")
215 .context("Failed to increase cache size")?;
216
217 println!("Database configured for bulk insert performance");
218
219 Ok(conn)
220}
221
222fn check_disk_space_requirements(db_path: &str, config: &Option<PopulationConfig>) -> Result<()> {
223 let default_config = PopulationConfig::default();
224 let config = config.as_ref().unwrap_or(&default_config);
225 let avg_row_size = estimate_row_size(&config.columns);
227 let estimated_size_mb = (avg_row_size * config.row_count) as f64 / (1024.0 * 1024.0);
228
229 println!("Estimated space needed: ~{:.1} MB", estimated_size_mb);
230
231 if let Ok(metadata) = std::fs::metadata(db_path) {
233 if metadata.len() == 0 {
234 eprintln!("Warning: Database file appears to be empty");
235 }
236 }
237
238 println!("Ensure you have sufficient disk space before proceeding");
239 Ok(())
240}
241
242fn estimate_row_size(columns: &[ColumnConfig]) -> usize {
243 columns
244 .iter()
245 .map(|col| match col.data_type {
246 DataType::Integer => 8,
247 DataType::Text => 50, DataType::Real => 8,
249 DataType::Boolean => 1,
250 DataType::Date => 8,
251 DataType::Timestamp => 8,
252 DataType::UUID => 36,
253 })
254 .sum()
255}
256
257fn create_table_with_config(conn: &Connection, config: &PopulationConfig) -> Result<()> {
258 let column_defs: Vec<String> = config
259 .columns
260 .iter()
261 .map(|col| {
262 let type_str = match col.data_type {
263 DataType::Integer => "INTEGER",
264 DataType::Text => "TEXT",
265 DataType::Real => "REAL",
266 DataType::Boolean => "INTEGER",
267 DataType::Date => "TEXT",
268 DataType::Timestamp => "TEXT",
269 DataType::UUID => "TEXT",
270 };
271
272 let nullable = if col.nullable { "" } else { " NOT NULL" };
273 format!("{} {}{}", col.name, type_str, nullable)
274 })
275 .collect();
276
277 let create_table_sql = format!(
278 "CREATE TABLE IF NOT EXISTS {} ({})",
279 config.table_name,
280 column_defs.join(", ")
281 );
282
283 conn.execute(&create_table_sql, [])
284 .context("Failed to create table. Check database permissions and disk space.")?;
285
286 let existing_count: i64 = conn
288 .query_row(
289 &format!("SELECT COUNT(*) FROM {}", config.table_name),
290 [],
291 |row| row.get(0),
292 )
293 .context("Failed to check existing row count")?;
294
295 if existing_count > 0 {
296 println!(
297 "Table '{}' already contains {} rows",
298 config.table_name, existing_count
299 );
300 println!("Population will add {} more rows", config.row_count);
301 } else {
302 println!("Table '{}' created successfully", config.table_name);
303 }
304
305 Ok(())
306}
307
308fn populate_with_transaction(conn: &mut Connection, config: &PopulationConfig) -> Result<usize> {
309 let tx = conn.transaction().context("Failed to begin transaction")?;
310
311 let placeholders = (0..config.columns.len())
312 .map(|_| "?")
313 .collect::<Vec<_>>()
314 .join(", ");
315 let column_names: Vec<String> = config.columns.iter().map(|c| c.name.clone()).collect();
316 let column_names_str = column_names.join(", ");
317 let insert_sql = format!(
318 "INSERT INTO {} ({}) VALUES ({})",
319 config.table_name, column_names_str, placeholders
320 );
321
322 let mut stmt = tx
323 .prepare(&insert_sql)
324 .context("Failed to prepare insert statement")?;
325
326 let mut rng = if let Some(seed) = config.seed {
327 StdRng::seed_from_u64(seed)
328 } else {
329 StdRng::from_entropy()
330 };
331
332 let mut rows_inserted = 0;
333 let start_time = Instant::now();
334 let mut last_checkpoint = Instant::now();
335 let checkpoint_interval = Duration::from_secs(30);
336
337 for batch_start in (0..config.row_count).step_by(config.batch_size) {
338 let batch_end = std::cmp::min(batch_start + config.batch_size, config.row_count);
339
340 for i in batch_start..batch_end {
341 let values = generate_row_values(&config.columns, i, &mut rng);
342
343 match stmt.execute(rusqlite::params_from_iter(values)) {
344 Ok(_) => {
345 rows_inserted += 1;
346
347 if rows_inserted % config.batch_size == 0 {
349 let elapsed = start_time.elapsed();
350 let rate = rows_inserted as f64 / elapsed.as_secs_f64();
351 let eta = if rate > 0.0 {
352 Duration::from_secs(
353 ((config.row_count - rows_inserted) as f64 / rate) as u64,
354 )
355 } else {
356 Duration::from_secs(0)
357 };
358
359 println!(
360 "Progress: {}/{} rows ({:.1}%) - {:.0} rows/sec - ETA: {:?}",
361 rows_inserted,
362 config.row_count,
363 (rows_inserted as f64 / config.row_count as f64) * 100.0,
364 rate,
365 eta
366 );
367 }
368 }
369 Err(e) => {
370 eprintln!("Failed to insert row {}: {}", i + 1, e);
371
372 if is_transient_error(&e) && should_retry_insert(rows_inserted) {
374 eprintln!("Retrying row {}...", i + 1);
375 std::thread::sleep(Duration::from_millis(10));
376 continue;
377 } else {
378 return Err(e).with_context(|| {
379 format!("Failed to insert row {} after retries", i + 1)
380 });
381 }
382 }
383 }
384 }
385
386 if last_checkpoint.elapsed() >= checkpoint_interval {
388 println!("Creating checkpoint...");
389 tx.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])
390 .context("Failed to create checkpoint")?;
391 last_checkpoint = Instant::now();
392 }
393 }
394
395 println!("Committing transaction...");
396 drop(stmt); tx.commit()
398 .context("Failed to commit transaction. All changes have been rolled back.")?;
399
400 Ok(rows_inserted)
401}
402
403fn generate_row_values(
404 columns: &[ColumnConfig],
405 row_index: usize,
406 rng: &mut StdRng,
407) -> Vec<String> {
408 columns
409 .iter()
410 .map(|col| {
411 if col.nullable && rng.gen_bool(0.1) {
412 return "NULL".to_string();
414 }
415
416 match (&col.data_type, &col.distribution) {
417 (DataType::Integer, DataDistribution::Sequential) => row_index.to_string(),
418 (DataType::Integer, DataDistribution::Uniform) => {
419 rng.gen_range(0..1000).to_string()
420 }
421 (DataType::Integer, DataDistribution::Normal { mean, std_dev }) => {
422 let value = rng.gen_range(0.0..1.0);
423 let normal = (value - 0.5) * std_dev + mean;
424 (normal.round() as i64).to_string()
425 }
426 (DataType::Text, DataDistribution::Random) => {
427 format!("text-{}", rng.gen_range(0..1000))
428 }
429 (DataType::Text, DataDistribution::Custom(values)) => {
430 values[rng.gen_range(0..values.len())].clone()
431 }
432 (DataType::Real, DataDistribution::Normal { mean, std_dev }) => {
433 let value = rng.gen_range(0.0..1.0);
434 let normal = (value - 0.5) * std_dev + mean;
435 format!("{:.2}", normal)
436 }
437 (DataType::Boolean, _) => rng.gen_bool(0.5).to_string(),
438 (DataType::Date, _) => {
439 let days = rng.gen_range(0..365);
440 let date = Utc::now() - ChronoDuration::days(days);
441 date.format("%Y-%m-%d").to_string()
442 }
443 (DataType::Timestamp, _) => {
444 let seconds = rng.gen_range(0..86400);
445 let timestamp = Utc::now() - ChronoDuration::seconds(seconds);
446 timestamp.format("%Y-%m-%d %H:%M:%S").to_string()
447 }
448 (DataType::UUID, _) => Uuid::new_v4().to_string(),
449 _ => "".to_string(), }
451 })
452 .collect()
453}
454
455fn is_transient_error(error: &rusqlite::Error) -> bool {
456 match error {
457 rusqlite::Error::SqliteFailure(err, _) => {
458 matches!(
459 err.code,
460 rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked
461 )
462 }
463 _ => false,
464 }
465}
466
467fn should_retry_insert(rows_so_far: usize) -> bool {
468 rows_so_far < 100_000
469}
470
471fn cleanup_failed_population(conn: &Connection, table_name: &str) -> Result<()> {
472 let table_exists: bool = conn
474 .query_row(
475 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
476 [table_name],
477 |row| {
478 let count: i64 = row.get(0)?;
479 Ok(count > 0)
480 },
481 )
482 .context("Failed to check if table exists")?;
483
484 if table_exists {
485 let row_count: i64 = conn
487 .query_row(&format!("SELECT COUNT(*) FROM {}", table_name), [], |row| {
488 row.get(0)
489 })
490 .context("Failed to count rows in table")?;
491
492 if row_count > 0 {
493 println!("Table '{}' contains {} partial rows", table_name, row_count);
494 println!(
495 "Run 'DROP TABLE {};' to remove it, or try populating again",
496 table_name
497 );
498 }
499 }
500
501 Ok(())
502}
503
504fn verify_population_success(conn: &Connection, config: &PopulationConfig) -> Result<()> {
505 let final_count: i64 = conn
506 .query_row(
507 &format!("SELECT COUNT(*) FROM {}", config.table_name),
508 [],
509 |row| row.get(0),
510 )
511 .context("Failed to verify population by counting rows")?;
512
513 let sample_row: Option<Vec<String>> = match conn.query_row(
515 &format!("SELECT * FROM {} LIMIT 1", config.table_name),
516 [],
517 |row| {
518 let mut values = Vec::new();
519 for i in 0..config.columns.len() {
520 let value = match config.columns[i].data_type {
522 DataType::Integer => row.get::<_, i64>(i)?.to_string(),
523 DataType::Text => row.get::<_, String>(i)?,
524 DataType::Real => row.get::<_, f64>(i)?.to_string(),
525 DataType::Boolean => row.get::<_, bool>(i)?.to_string(),
526 DataType::Date | DataType::Timestamp | DataType::UUID => {
527 row.get::<_, String>(i)?
528 }
529 };
530 values.push(value);
531 }
532 Ok(values)
533 },
534 ) {
535 Ok(data) => Some(data),
536 Err(rusqlite::Error::QueryReturnedNoRows) => None,
537 Err(e) => return Err(e).context("Failed to verify sample data"),
538 };
539
540 if let Some(values) = sample_row {
541 if values.len() == config.columns.len() {
542 println!("Data integrity verification passed");
543 } else {
544 eprintln!("Warning: Data integrity check failed - sample data doesn't match expected column count");
545 }
546 }
547
548 println!(
549 "Final row count: {} rows in '{}'",
550 final_count, config.table_name
551 );
552
553 if final_count >= config.row_count as i64 {
554 println!("Population completed successfully!");
555 } else {
556 eprintln!(
557 "Warning: Expected at least {} rows but found {}",
558 config.row_count, final_count
559 );
560 }
561
562 Ok(())
563}