1use anyhow::{Context, Result};
2use rusqlite::Connection;
3use std::path::Path;
4use std::time::{Duration, Instant};
5use serde::{Deserialize, Serialize};
6use rand::{Rng, SeedableRng};
7use rand::rngs::StdRng;
8use uuid::Uuid;
9use chrono::{Utc, Duration as ChronoDuration};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct PopulationConfig {
13 pub table_name: String,
14 pub row_count: usize,
15 pub batch_size: usize,
16 pub seed: Option<u64>,
17 pub columns: Vec<ColumnConfig>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ColumnConfig {
22 pub name: String,
23 pub data_type: DataType,
24 pub distribution: DataDistribution,
25 pub nullable: bool,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum DataType {
30 Integer,
31 Text,
32 Real,
33 Boolean,
34 Date,
35 Timestamp,
36 UUID,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum DataDistribution {
41 Uniform,
42 Normal { mean: f64, std_dev: f64 },
43 Sequential,
44 Random,
45 Custom(Vec<String>),
46}
47
48impl Default for PopulationConfig {
49 fn default() -> Self {
50 Self {
51 table_name: "large_table".to_string(),
52 row_count: 1_000_000,
53 batch_size: 10_000,
54 seed: None,
55 columns: vec![
56 ColumnConfig {
57 name: "id".to_string(),
58 data_type: DataType::Integer,
59 distribution: DataDistribution::Sequential,
60 nullable: false,
61 },
62 ColumnConfig {
63 name: "text_col".to_string(),
64 data_type: DataType::Text,
65 distribution: DataDistribution::Random,
66 nullable: true,
67 },
68 ColumnConfig {
69 name: "value".to_string(),
70 data_type: DataType::Real,
71 distribution: DataDistribution::Normal { mean: 100.0, std_dev: 15.0 },
72 nullable: false,
73 },
74 ],
75 }
76 }
77}
78
79pub fn populate_database(db_path: &str, config: Option<PopulationConfig>) -> Result<()> {
81 println!("Connecting to database: {}", db_path);
82
83 validate_database_for_population(db_path)?;
85
86 let mut conn = create_connection_with_settings(db_path)?;
87
88 check_disk_space_requirements(db_path, &config)?;
90
91 let config = config.unwrap_or_default();
92 println!("Creating table '{}'...", config.table_name);
93 create_table_with_config(&conn, &config)?;
94
95 println!("Populating table with {} rows...", config.row_count);
96 println!("This may take a while. Progress will be shown every {} rows.", config.batch_size);
97
98 let start_time = Instant::now();
99
100 let result = populate_with_transaction(&mut conn, &config);
102
103 match result {
104 Ok(rows_inserted) => {
105 let duration = start_time.elapsed();
106 println!("Successfully populated table '{}' with {} rows", config.table_name, rows_inserted);
107 println!("Total time: {:.2} seconds", duration.as_secs_f64());
108 println!("Average: {:.0} rows/second", rows_inserted as f64 / duration.as_secs_f64());
109 }
110 Err(e) => {
111 eprintln!("Population failed: {}", e);
112 eprintln!("Attempting to rollback any partial changes...");
113
114 if let Err(cleanup_err) = cleanup_failed_population(&conn, &config.table_name) {
116 eprintln!("Warning: Cleanup failed: {}", cleanup_err);
117 eprintln!("You may need to manually drop the table if it was partially created.");
118 } else {
119 println!("Cleanup completed successfully");
120 }
121
122 return Err(e);
123 }
124 }
125
126 verify_population_success(&conn, &config)?;
128
129 Ok(())
130}
131
132fn validate_database_for_population(db_path: &str) -> Result<()> {
133 if !Path::new(db_path).exists() {
134 anyhow::bail!("Database '{}' does not exist. Create it first with 'init' command.", db_path);
135 }
136
137 let metadata = std::fs::metadata(db_path)
138 .with_context(|| format!("Cannot read database file '{}'", db_path))?;
139
140 if metadata.is_dir() {
141 anyhow::bail!("'{}' is a directory, not a database file", db_path);
142 }
143
144 Ok(())
145}
146
147fn create_connection_with_settings(db_path: &str) -> Result<Connection> {
148 let conn = Connection::open(db_path)
149 .with_context(|| format!("Failed to connect to database: {}", db_path))?;
150
151 conn.pragma_update(None, "synchronous", "OFF")
153 .context("Failed to disable synchronous mode")?;
154
155 conn.pragma_update(None, "journal_mode", "MEMORY")
156 .context("Failed to set journal mode to memory")?;
157
158 conn.pragma_update(None, "cache_size", "10000")
159 .context("Failed to increase cache size")?;
160
161 println!("Database configured for bulk insert performance");
162
163 Ok(conn)
164}
165
166fn check_disk_space_requirements(db_path: &str, config: &Option<PopulationConfig>) -> Result<()> {
167 let default_config = PopulationConfig::default();
168 let config = config.as_ref().unwrap_or(&default_config);
169 let avg_row_size = estimate_row_size(&config.columns);
171 let estimated_size_mb = (avg_row_size * config.row_count) as f64 / (1024.0 * 1024.0);
172
173 println!("Estimated space needed: ~{:.1} MB", estimated_size_mb);
174
175 if let Ok(metadata) = std::fs::metadata(db_path) {
177 if metadata.len() == 0 {
178 eprintln!("Warning: Database file appears to be empty");
179 }
180 }
181
182 println!("Ensure you have sufficient disk space before proceeding");
183 Ok(())
184}
185
186fn estimate_row_size(columns: &[ColumnConfig]) -> usize {
187 columns.iter().map(|col| match col.data_type {
188 DataType::Integer => 8,
189 DataType::Text => 50, DataType::Real => 8,
191 DataType::Boolean => 1,
192 DataType::Date => 8,
193 DataType::Timestamp => 8,
194 DataType::UUID => 36,
195 }).sum()
196}
197
198fn create_table_with_config(conn: &Connection, config: &PopulationConfig) -> Result<()> {
199 let column_defs: Vec<String> = config.columns.iter().map(|col| {
200 let type_str = match col.data_type {
201 DataType::Integer => "INTEGER",
202 DataType::Text => "TEXT",
203 DataType::Real => "REAL",
204 DataType::Boolean => "INTEGER",
205 DataType::Date => "TEXT",
206 DataType::Timestamp => "TEXT",
207 DataType::UUID => "TEXT",
208 };
209
210 let nullable = if col.nullable { "" } else { " NOT NULL" };
211 format!("{} {}{}", col.name, type_str, nullable)
212 }).collect();
213
214 let create_table_sql = format!(
215 "CREATE TABLE IF NOT EXISTS {} ({})",
216 config.table_name,
217 column_defs.join(", ")
218 );
219
220 conn.execute(&create_table_sql, [])
221 .context("Failed to create table. Check database permissions and disk space.")?;
222
223 let existing_count: i64 = conn.query_row(
225 &format!("SELECT COUNT(*) FROM {}", config.table_name),
226 [],
227 |row| row.get(0)
228 ).context("Failed to check existing row count")?;
229
230 if existing_count > 0 {
231 println!("Table '{}' already contains {} rows", config.table_name, existing_count);
232 println!("Population will add {} more rows", config.row_count);
233 } else {
234 println!("Table '{}' created successfully", config.table_name);
235 }
236
237 Ok(())
238}
239
240fn populate_with_transaction(conn: &mut Connection, config: &PopulationConfig) -> Result<usize> {
241 let tx = conn.transaction()
242 .context("Failed to begin transaction")?;
243
244 let placeholders = (0..config.columns.len()).map(|_| "?").collect::<Vec<_>>().join(", ");
245 let column_names: Vec<String> = config.columns.iter().map(|c| c.name.clone()).collect();
246 let column_names_str = column_names.join(", ");
247 let insert_sql = format!(
248 "INSERT INTO {} ({}) VALUES ({})",
249 config.table_name,
250 column_names_str,
251 placeholders
252 );
253
254 let mut stmt = tx.prepare(&insert_sql)
255 .context("Failed to prepare insert statement")?;
256
257 let mut rng = if let Some(seed) = config.seed {
258 StdRng::seed_from_u64(seed)
259 } else {
260 StdRng::from_entropy()
261 };
262
263 let mut rows_inserted = 0;
264 let start_time = Instant::now();
265 let mut last_checkpoint = Instant::now();
266 let checkpoint_interval = Duration::from_secs(30);
267
268 for batch_start in (0..config.row_count).step_by(config.batch_size) {
269 let batch_end = std::cmp::min(batch_start + config.batch_size, config.row_count);
270
271 for i in batch_start..batch_end {
272 let values = generate_row_values(&config.columns, i, &mut rng);
273
274 match stmt.execute(rusqlite::params_from_iter(values)) {
275 Ok(_) => {
276 rows_inserted += 1;
277
278 if rows_inserted % config.batch_size == 0 {
280 let elapsed = start_time.elapsed();
281 let rate = rows_inserted as f64 / elapsed.as_secs_f64();
282 let eta = if rate > 0.0 {
283 Duration::from_secs(((config.row_count - rows_inserted) as f64 / rate) as u64)
284 } else {
285 Duration::from_secs(0)
286 };
287
288 println!("Progress: {}/{} rows ({:.1}%) - {:.0} rows/sec - ETA: {:?}",
289 rows_inserted, config.row_count,
290 (rows_inserted as f64 / config.row_count as f64) * 100.0,
291 rate,
292 eta);
293 }
294 }
295 Err(e) => {
296 eprintln!("Failed to insert row {}: {}", i + 1, e);
297
298 if is_transient_error(&e) && should_retry_insert(rows_inserted) {
300 eprintln!("Retrying row {}...", i + 1);
301 std::thread::sleep(Duration::from_millis(10));
302 continue;
303 } else {
304 return Err(e).with_context(|| format!("Failed to insert row {} after retries", i + 1));
305 }
306 }
307 }
308 }
309
310 if last_checkpoint.elapsed() >= checkpoint_interval {
312 println!("Creating checkpoint...");
313 tx.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])
314 .context("Failed to create checkpoint")?;
315 last_checkpoint = Instant::now();
316 }
317 }
318
319 println!("Committing transaction...");
320 drop(stmt); tx.commit()
322 .context("Failed to commit transaction. All changes have been rolled back.")?;
323
324 Ok(rows_inserted)
325}
326
327fn generate_row_values(columns: &[ColumnConfig], row_index: usize, rng: &mut StdRng) -> Vec<String> {
328 columns.iter().map(|col| {
329 if col.nullable && rng.gen_bool(0.1) { return "NULL".to_string();
331 }
332
333 match (&col.data_type, &col.distribution) {
334 (DataType::Integer, DataDistribution::Sequential) => row_index.to_string(),
335 (DataType::Integer, DataDistribution::Uniform) => rng.gen_range(0..1000).to_string(),
336 (DataType::Integer, DataDistribution::Normal { mean, std_dev }) => {
337 let value = rng.gen_range(0.0..1.0);
338 let normal = (value - 0.5) * std_dev + mean;
339 (normal.round() as i64).to_string()
340 },
341 (DataType::Text, DataDistribution::Random) => {
342 format!("text-{}", rng.gen_range(0..1000))
343 },
344 (DataType::Text, DataDistribution::Custom(values)) => {
345 values[rng.gen_range(0..values.len())].clone()
346 },
347 (DataType::Real, DataDistribution::Normal { mean, std_dev }) => {
348 let value = rng.gen_range(0.0..1.0);
349 let normal = (value - 0.5) * std_dev + mean;
350 format!("{:.2}", normal)
351 },
352 (DataType::Boolean, _) => rng.gen_bool(0.5).to_string(),
353 (DataType::Date, _) => {
354 let days = rng.gen_range(0..365);
355 let date = Utc::now() - ChronoDuration::days(days);
356 date.format("%Y-%m-%d").to_string()
357 },
358 (DataType::Timestamp, _) => {
359 let seconds = rng.gen_range(0..86400);
360 let timestamp = Utc::now() - ChronoDuration::seconds(seconds);
361 timestamp.format("%Y-%m-%d %H:%M:%S").to_string()
362 },
363 (DataType::UUID, _) => Uuid::new_v4().to_string(),
364 _ => "".to_string(), }
366 }).collect()
367}
368
369fn is_transient_error(error: &rusqlite::Error) -> bool {
370 match error {
371 rusqlite::Error::SqliteFailure(err, _) => {
372 matches!(err.code, rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked)
373 }
374 _ => false
375 }
376}
377
378fn should_retry_insert(rows_so_far: usize) -> bool {
379 rows_so_far < 100_000
380}
381
382fn cleanup_failed_population(conn: &Connection, table_name: &str) -> Result<()> {
383 let table_exists: bool = conn.query_row(
385 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
386 [table_name],
387 |row| {
388 let count: i64 = row.get(0)?;
389 Ok(count > 0)
390 }
391 ).context("Failed to check if table exists")?;
392
393 if table_exists {
394 let row_count: i64 = conn.query_row(
396 &format!("SELECT COUNT(*) FROM {}", table_name),
397 [],
398 |row| row.get(0)
399 ).context("Failed to count rows in table")?;
400
401 if row_count > 0 {
402 println!("Table '{}' contains {} partial rows", table_name, row_count);
403 println!("Run 'DROP TABLE {};' to remove it, or try populating again", table_name);
404 }
405 }
406
407 Ok(())
408}
409
410fn verify_population_success(conn: &Connection, config: &PopulationConfig) -> Result<()> {
411 let final_count: i64 = conn.query_row(
412 &format!("SELECT COUNT(*) FROM {}", config.table_name),
413 [],
414 |row| row.get(0)
415 ).context("Failed to verify population by counting rows")?;
416
417 let sample_row: Option<Vec<String>> = match conn.query_row(
419 &format!("SELECT * FROM {} LIMIT 1", config.table_name),
420 [],
421 |row| {
422 let mut values = Vec::new();
423 for i in 0..config.columns.len() {
424 let value = match config.columns[i].data_type {
426 DataType::Integer => row.get::<_, i64>(i)?.to_string(),
427 DataType::Text => row.get::<_, String>(i)?,
428 DataType::Real => row.get::<_, f64>(i)?.to_string(),
429 DataType::Boolean => row.get::<_, bool>(i)?.to_string(),
430 DataType::Date | DataType::Timestamp | DataType::UUID => row.get::<_, String>(i)?,
431 };
432 values.push(value);
433 }
434 Ok(values)
435 }
436 ) {
437 Ok(data) => Some(data),
438 Err(rusqlite::Error::QueryReturnedNoRows) => None,
439 Err(e) => return Err(e).context("Failed to verify sample data"),
440 };
441
442 if let Some(values) = sample_row {
443 if values.len() == config.columns.len() {
444 println!("Data integrity verification passed");
445 } else {
446 eprintln!("Warning: Data integrity check failed - sample data doesn't match expected column count");
447 }
448 }
449
450 println!("Final row count: {} rows in '{}'", final_count, config.table_name);
451
452 if final_count >= config.row_count as i64 {
453 println!("Population completed successfully!");
454 } else {
455 eprintln!("Warning: Expected at least {} rows but found {}", config.row_count, final_count);
456 }
457
458 Ok(())
459}