vapor_cli/
populate.rs

1//! # Database Population with Synthetic Data
2//!
3//! This module provides a powerful and configurable system for populating a database with
4//! large amounts of synthetic data. It is designed for performance testing, schema validation,
5//! and generating realistic-looking datasets for development purposes.
6//!
7//! ## Core Features:
8//! - **Configurable Data Generation**: Define table structure, row count, and data types via a `PopulationConfig` struct.
9//! - **Rich Data Types**: Supports common types like `Integer`, `Text`, `Real`, `Boolean`, `Date`, `Timestamp`, and `UUID`.
10//! - **Varied Data Distributions**: Generate data that is sequential, random, uniform, or follows a normal distribution.
11//! - **High Performance**: Uses bulk `INSERT` statements, transactions, and optimized SQLite PRAGMA settings for speed.
12//! - **Robust Error Handling**: Includes pre-flight checks, progress tracking, and cleanup procedures for failed runs.
13//! - **Reproducibility**: Population can be made deterministic by providing a seed value.
14
15use anyhow::{Context, Result};
16use chrono::{Duration as ChronoDuration, Utc};
17use rand::rngs::StdRng;
18use rand::{Rng, SeedableRng};
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::path::Path;
22use std::time::{Duration, Instant};
23use uuid::Uuid;
24
25/// Defines the complete configuration for a database population task.
26///
27/// This struct specifies the target table, the number of rows to generate, performance settings,
28/// and a detailed schema for each column.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PopulationConfig {
31    pub table_name: String,
32    pub row_count: usize,
33    pub batch_size: usize,
34    pub seed: Option<u64>,
35    pub columns: Vec<ColumnConfig>,
36}
37
38/// Configuration for a single column within a table to be populated.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ColumnConfig {
41    pub name: String,
42    pub data_type: DataType,
43    pub distribution: DataDistribution,
44    pub nullable: bool,
45}
46
47/// Enumerates the supported data types for synthetic data generation.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum DataType {
50    Integer,
51    Text,
52    Real,
53    Boolean,
54    Date,
55    Timestamp,
56    UUID,
57}
58
59/// Defines the statistical distribution or pattern for generating data in a column.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum DataDistribution {
62    Uniform,
63    Normal { mean: f64, std_dev: f64 },
64    Sequential,
65    Random,
66    Custom(Vec<String>),
67}
68
69impl Default for PopulationConfig {
70    fn default() -> Self {
71        Self {
72            table_name: "large_table".to_string(),
73            row_count: 1_000_000,
74            batch_size: 10_000,
75            seed: None,
76            columns: vec![
77                ColumnConfig {
78                    name: "id".to_string(),
79                    data_type: DataType::Integer,
80                    distribution: DataDistribution::Sequential,
81                    nullable: false,
82                },
83                ColumnConfig {
84                    name: "text_col".to_string(),
85                    data_type: DataType::Text,
86                    distribution: DataDistribution::Random,
87                    nullable: true,
88                },
89                ColumnConfig {
90                    name: "value".to_string(),
91                    data_type: DataType::Real,
92                    distribution: DataDistribution::Normal {
93                        mean: 100.0,
94                        std_dev: 15.0,
95                    },
96                    nullable: false,
97                },
98            ],
99        }
100    }
101}
102
103/// Populates a database with a large volume of synthetic data based on a provided configuration.
104///
105/// This is the main entry point for the population functionality. It orchestrates the entire
106/// process, including:
107/// 1. Validating the database file.
108/// 2. Optimizing the database connection for bulk writes.
109/// 3. Estimating disk space requirements.
110/// 4. Creating the target table if it doesn't exist.
111/// 5. Inserting data in batches within a single transaction.
112/// 6. Providing progress updates and performance metrics.
113/// 7. Verifying the final row count.
114///
115/// # Arguments
116///
117/// * `db_path` - The file path to the SQLite database.
118/// * `config` - An `Option<PopulationConfig>` that defines the population parameters.
119///              If `None`, a default configuration is used.
120///
121/// # Returns
122///
123/// A `Result` which is `Ok(())` on success, or an `Err` if any part of the process fails.
124pub fn populate_database(db_path: &str, config: Option<PopulationConfig>) -> Result<()> {
125    println!("Connecting to database: {}", db_path);
126
127    // Validate database exists and is accessible
128    validate_database_for_population(db_path)?;
129
130    let mut conn = create_connection_with_settings(db_path)?;
131
132    // Check available disk space before starting
133    check_disk_space_requirements(db_path, &config)?;
134
135    let config = config.unwrap_or_default();
136    println!("Creating table '{}'...", config.table_name);
137    create_table_with_config(&conn, &config)?;
138
139    println!("Populating table with {} rows...", config.row_count);
140    println!(
141        "This may take a while. Progress will be shown every {} rows.",
142        config.batch_size
143    );
144
145    let start_time = Instant::now();
146
147    // Use transaction for better performance and atomicity
148    let result = populate_with_transaction(&mut conn, &config);
149
150    match result {
151        Ok(rows_inserted) => {
152            let duration = start_time.elapsed();
153            println!(
154                "Successfully populated table '{}' with {} rows",
155                config.table_name, rows_inserted
156            );
157            println!("Total time: {:.2} seconds", duration.as_secs_f64());
158            println!(
159                "Average: {:.0} rows/second",
160                rows_inserted as f64 / duration.as_secs_f64()
161            );
162        }
163        Err(e) => {
164            eprintln!("Population failed: {}", e);
165            eprintln!("Attempting to rollback any partial changes...");
166
167            // Try to clean up any partial data
168            if let Err(cleanup_err) = cleanup_failed_population(&conn, &config.table_name) {
169                eprintln!("Warning: Cleanup failed: {}", cleanup_err);
170                eprintln!("You may need to manually drop the table if it was partially created.");
171            } else {
172                println!("Cleanup completed successfully");
173            }
174
175            return Err(e);
176        }
177    }
178
179    // Verify the population was successful
180    verify_population_success(&conn, &config)?;
181
182    Ok(())
183}
184
185fn validate_database_for_population(db_path: &str) -> Result<()> {
186    if !Path::new(db_path).exists() {
187        anyhow::bail!(
188            "Database '{}' does not exist. Create it first with 'init' command.",
189            db_path
190        );
191    }
192
193    let metadata = std::fs::metadata(db_path)
194        .with_context(|| format!("Cannot read database file '{}'", db_path))?;
195
196    if metadata.is_dir() {
197        anyhow::bail!("'{}' is a directory, not a database file", db_path);
198    }
199
200    Ok(())
201}
202
203fn create_connection_with_settings(db_path: &str) -> Result<Connection> {
204    let conn = Connection::open(db_path)
205        .with_context(|| format!("Failed to connect to database: {}", db_path))?;
206
207    // Configure SQLite for better performance during bulk inserts
208    conn.pragma_update(None, "synchronous", "OFF")
209        .context("Failed to disable synchronous mode")?;
210
211    conn.pragma_update(None, "journal_mode", "MEMORY")
212        .context("Failed to set journal mode to memory")?;
213
214    conn.pragma_update(None, "cache_size", "10000")
215        .context("Failed to increase cache size")?;
216
217    println!("Database configured for bulk insert performance");
218
219    Ok(conn)
220}
221
222fn check_disk_space_requirements(db_path: &str, config: &Option<PopulationConfig>) -> Result<()> {
223    let default_config = PopulationConfig::default();
224    let config = config.as_ref().unwrap_or(&default_config);
225    // Estimate space needed based on column types and row count
226    let avg_row_size = estimate_row_size(&config.columns);
227    let estimated_size_mb = (avg_row_size * config.row_count) as f64 / (1024.0 * 1024.0);
228
229    println!("Estimated space needed: ~{:.1} MB", estimated_size_mb);
230
231    // Try to get available space (this is platform-specific, so we'll make it non-fatal)
232    if let Ok(metadata) = std::fs::metadata(db_path) {
233        if metadata.len() == 0 {
234            eprintln!("Warning: Database file appears to be empty");
235        }
236    }
237
238    println!("Ensure you have sufficient disk space before proceeding");
239    Ok(())
240}
241
242fn estimate_row_size(columns: &[ColumnConfig]) -> usize {
243    columns
244        .iter()
245        .map(|col| match col.data_type {
246            DataType::Integer => 8,
247            DataType::Text => 50, // Average text length
248            DataType::Real => 8,
249            DataType::Boolean => 1,
250            DataType::Date => 8,
251            DataType::Timestamp => 8,
252            DataType::UUID => 36,
253        })
254        .sum()
255}
256
257fn create_table_with_config(conn: &Connection, config: &PopulationConfig) -> Result<()> {
258    let column_defs: Vec<String> = config
259        .columns
260        .iter()
261        .map(|col| {
262            let type_str = match col.data_type {
263                DataType::Integer => "INTEGER",
264                DataType::Text => "TEXT",
265                DataType::Real => "REAL",
266                DataType::Boolean => "INTEGER",
267                DataType::Date => "TEXT",
268                DataType::Timestamp => "TEXT",
269                DataType::UUID => "TEXT",
270            };
271
272            let nullable = if col.nullable { "" } else { " NOT NULL" };
273            format!("{} {}{}", col.name, type_str, nullable)
274        })
275        .collect();
276
277    let create_table_sql = format!(
278        "CREATE TABLE IF NOT EXISTS {} ({})",
279        config.table_name,
280        column_defs.join(", ")
281    );
282
283    conn.execute(&create_table_sql, [])
284        .context("Failed to create table. Check database permissions and disk space.")?;
285
286    // Check if table already has data
287    let existing_count: i64 = conn
288        .query_row(
289            &format!("SELECT COUNT(*) FROM {}", config.table_name),
290            [],
291            |row| row.get(0),
292        )
293        .context("Failed to check existing row count")?;
294
295    if existing_count > 0 {
296        println!(
297            "Table '{}' already contains {} rows",
298            config.table_name, existing_count
299        );
300        println!("Population will add {} more rows", config.row_count);
301    } else {
302        println!("Table '{}' created successfully", config.table_name);
303    }
304
305    Ok(())
306}
307
308fn populate_with_transaction(conn: &mut Connection, config: &PopulationConfig) -> Result<usize> {
309    let tx = conn.transaction().context("Failed to begin transaction")?;
310
311    let placeholders = (0..config.columns.len())
312        .map(|_| "?")
313        .collect::<Vec<_>>()
314        .join(", ");
315    let column_names: Vec<String> = config.columns.iter().map(|c| c.name.clone()).collect();
316    let column_names_str = column_names.join(", ");
317    let insert_sql = format!(
318        "INSERT INTO {} ({}) VALUES ({})",
319        config.table_name, column_names_str, placeholders
320    );
321
322    let mut stmt = tx
323        .prepare(&insert_sql)
324        .context("Failed to prepare insert statement")?;
325
326    let mut rng = if let Some(seed) = config.seed {
327        StdRng::seed_from_u64(seed)
328    } else {
329        StdRng::from_entropy()
330    };
331
332    let mut rows_inserted = 0;
333    let start_time = Instant::now();
334    let mut last_checkpoint = Instant::now();
335    let checkpoint_interval = Duration::from_secs(30);
336
337    for batch_start in (0..config.row_count).step_by(config.batch_size) {
338        let batch_end = std::cmp::min(batch_start + config.batch_size, config.row_count);
339
340        for i in batch_start..batch_end {
341            let values = generate_row_values(&config.columns, i, &mut rng);
342
343            match stmt.execute(rusqlite::params_from_iter(values)) {
344                Ok(_) => {
345                    rows_inserted += 1;
346
347                    // Show progress
348                    if rows_inserted % config.batch_size == 0 {
349                        let elapsed = start_time.elapsed();
350                        let rate = rows_inserted as f64 / elapsed.as_secs_f64();
351                        let eta = if rate > 0.0 {
352                            Duration::from_secs(
353                                ((config.row_count - rows_inserted) as f64 / rate) as u64,
354                            )
355                        } else {
356                            Duration::from_secs(0)
357                        };
358
359                        println!(
360                            "Progress: {}/{} rows ({:.1}%) - {:.0} rows/sec - ETA: {:?}",
361                            rows_inserted,
362                            config.row_count,
363                            (rows_inserted as f64 / config.row_count as f64) * 100.0,
364                            rate,
365                            eta
366                        );
367                    }
368                }
369                Err(e) => {
370                    eprintln!("Failed to insert row {}: {}", i + 1, e);
371
372                    // Try to continue with a few retries for transient errors
373                    if is_transient_error(&e) && should_retry_insert(rows_inserted) {
374                        eprintln!("Retrying row {}...", i + 1);
375                        std::thread::sleep(Duration::from_millis(10));
376                        continue;
377                    } else {
378                        return Err(e).with_context(|| {
379                            format!("Failed to insert row {} after retries", i + 1)
380                        });
381                    }
382                }
383            }
384        }
385
386        // Create checkpoint if enough time has passed
387        if last_checkpoint.elapsed() >= checkpoint_interval {
388            println!("Creating checkpoint...");
389            tx.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])
390                .context("Failed to create checkpoint")?;
391            last_checkpoint = Instant::now();
392        }
393    }
394
395    println!("Committing transaction...");
396    drop(stmt); // Release the prepared statement before committing
397    tx.commit()
398        .context("Failed to commit transaction. All changes have been rolled back.")?;
399
400    Ok(rows_inserted)
401}
402
403fn generate_row_values(
404    columns: &[ColumnConfig],
405    row_index: usize,
406    rng: &mut StdRng,
407) -> Vec<String> {
408    columns
409        .iter()
410        .map(|col| {
411            if col.nullable && rng.gen_bool(0.1) {
412                // 10% chance of NULL
413                return "NULL".to_string();
414            }
415
416            match (&col.data_type, &col.distribution) {
417                (DataType::Integer, DataDistribution::Sequential) => row_index.to_string(),
418                (DataType::Integer, DataDistribution::Uniform) => {
419                    rng.gen_range(0..1000).to_string()
420                }
421                (DataType::Integer, DataDistribution::Normal { mean, std_dev }) => {
422                    let value = rng.gen_range(0.0..1.0);
423                    let normal = (value - 0.5) * std_dev + mean;
424                    (normal.round() as i64).to_string()
425                }
426                (DataType::Text, DataDistribution::Random) => {
427                    format!("text-{}", rng.gen_range(0..1000))
428                }
429                (DataType::Text, DataDistribution::Custom(values)) => {
430                    values[rng.gen_range(0..values.len())].clone()
431                }
432                (DataType::Real, DataDistribution::Normal { mean, std_dev }) => {
433                    let value = rng.gen_range(0.0..1.0);
434                    let normal = (value - 0.5) * std_dev + mean;
435                    format!("{:.2}", normal)
436                }
437                (DataType::Boolean, _) => rng.gen_bool(0.5).to_string(),
438                (DataType::Date, _) => {
439                    let days = rng.gen_range(0..365);
440                    let date = Utc::now() - ChronoDuration::days(days);
441                    date.format("%Y-%m-%d").to_string()
442                }
443                (DataType::Timestamp, _) => {
444                    let seconds = rng.gen_range(0..86400);
445                    let timestamp = Utc::now() - ChronoDuration::seconds(seconds);
446                    timestamp.format("%Y-%m-%d %H:%M:%S").to_string()
447                }
448                (DataType::UUID, _) => Uuid::new_v4().to_string(),
449                _ => "".to_string(), // Default case
450            }
451        })
452        .collect()
453}
454
455fn is_transient_error(error: &rusqlite::Error) -> bool {
456    match error {
457        rusqlite::Error::SqliteFailure(err, _) => {
458            matches!(
459                err.code,
460                rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked
461            )
462        }
463        _ => false,
464    }
465}
466
467fn should_retry_insert(rows_so_far: usize) -> bool {
468    rows_so_far < 100_000
469}
470
471fn cleanup_failed_population(conn: &Connection, table_name: &str) -> Result<()> {
472    // Check if table exists and has partial data
473    let table_exists: bool = conn
474        .query_row(
475            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
476            [table_name],
477            |row| {
478                let count: i64 = row.get(0)?;
479                Ok(count > 0)
480            },
481        )
482        .context("Failed to check if table exists")?;
483
484    if table_exists {
485        // Don't drop the table automatically, just report what to do
486        let row_count: i64 = conn
487            .query_row(&format!("SELECT COUNT(*) FROM {}", table_name), [], |row| {
488                row.get(0)
489            })
490            .context("Failed to count rows in table")?;
491
492        if row_count > 0 {
493            println!("Table '{}' contains {} partial rows", table_name, row_count);
494            println!(
495                "Run 'DROP TABLE {};' to remove it, or try populating again",
496                table_name
497            );
498        }
499    }
500
501    Ok(())
502}
503
504fn verify_population_success(conn: &Connection, config: &PopulationConfig) -> Result<()> {
505    let final_count: i64 = conn
506        .query_row(
507            &format!("SELECT COUNT(*) FROM {}", config.table_name),
508            [],
509            |row| row.get(0),
510        )
511        .context("Failed to verify population by counting rows")?;
512
513    // Get some sample data to verify integrity
514    let sample_row: Option<Vec<String>> = match conn.query_row(
515        &format!("SELECT * FROM {} LIMIT 1", config.table_name),
516        [],
517        |row| {
518            let mut values = Vec::new();
519            for i in 0..config.columns.len() {
520                // Handle different column types
521                let value = match config.columns[i].data_type {
522                    DataType::Integer => row.get::<_, i64>(i)?.to_string(),
523                    DataType::Text => row.get::<_, String>(i)?,
524                    DataType::Real => row.get::<_, f64>(i)?.to_string(),
525                    DataType::Boolean => row.get::<_, bool>(i)?.to_string(),
526                    DataType::Date | DataType::Timestamp | DataType::UUID => {
527                        row.get::<_, String>(i)?
528                    }
529                };
530                values.push(value);
531            }
532            Ok(values)
533        },
534    ) {
535        Ok(data) => Some(data),
536        Err(rusqlite::Error::QueryReturnedNoRows) => None,
537        Err(e) => return Err(e).context("Failed to verify sample data"),
538    };
539
540    if let Some(values) = sample_row {
541        if values.len() == config.columns.len() {
542            println!("Data integrity verification passed");
543        } else {
544            eprintln!("Warning: Data integrity check failed - sample data doesn't match expected column count");
545        }
546    }
547
548    println!(
549        "Final row count: {} rows in '{}'",
550        final_count, config.table_name
551    );
552
553    if final_count >= config.row_count as i64 {
554        println!("Population completed successfully!");
555    } else {
556        eprintln!(
557            "Warning: Expected at least {} rows but found {}",
558            config.row_count, final_count
559        );
560    }
561
562    Ok(())
563}