vapor_cli/
populate.rs

1use anyhow::{Context, Result};
2use rusqlite::Connection;
3use std::path::Path;
4use std::time::{Duration, Instant};
5use serde::{Deserialize, Serialize};
6use rand::{Rng, SeedableRng};
7use rand::rngs::StdRng;
8use uuid::Uuid;
9use chrono::{Utc, Duration as ChronoDuration};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct PopulationConfig {
13    pub table_name: String,
14    pub row_count: usize,
15    pub batch_size: usize,
16    pub seed: Option<u64>,
17    pub columns: Vec<ColumnConfig>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ColumnConfig {
22    pub name: String,
23    pub data_type: DataType,
24    pub distribution: DataDistribution,
25    pub nullable: bool,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum DataType {
30    Integer,
31    Text,
32    Real,
33    Boolean,
34    Date,
35    Timestamp,
36    UUID,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum DataDistribution {
41    Uniform,
42    Normal { mean: f64, std_dev: f64 },
43    Sequential,
44    Random,
45    Custom(Vec<String>),
46}
47
48impl Default for PopulationConfig {
49    fn default() -> Self {
50        Self {
51            table_name: "large_table".to_string(),
52            row_count: 1_000_000,
53            batch_size: 10_000,
54            seed: None,
55            columns: vec![
56                ColumnConfig {
57                    name: "id".to_string(),
58                    data_type: DataType::Integer,
59                    distribution: DataDistribution::Sequential,
60                    nullable: false,
61                },
62                ColumnConfig {
63                    name: "text_col".to_string(),
64                    data_type: DataType::Text,
65                    distribution: DataDistribution::Random,
66                    nullable: true,
67                },
68                ColumnConfig {
69                    name: "value".to_string(),
70                    data_type: DataType::Real,
71                    distribution: DataDistribution::Normal { mean: 100.0, std_dev: 15.0 },
72                    nullable: false,
73                },
74            ],
75        }
76    }
77}
78
79/// Populate database with test data, featuring comprehensive error handling and progress tracking
80pub fn populate_database(db_path: &str, config: Option<PopulationConfig>) -> Result<()> {
81    println!("Connecting to database: {}", db_path);
82    
83    // Validate database exists and is accessible
84    validate_database_for_population(db_path)?;
85    
86    let mut conn = create_connection_with_settings(db_path)?;
87    
88    // Check available disk space before starting
89    check_disk_space_requirements(db_path, &config)?;
90    
91    let config = config.unwrap_or_default();
92    println!("Creating table '{}'...", config.table_name);
93    create_table_with_config(&conn, &config)?;
94    
95    println!("Populating table with {} rows...", config.row_count);
96    println!("This may take a while. Progress will be shown every {} rows.", config.batch_size);
97    
98    let start_time = Instant::now();
99    
100    // Use transaction for better performance and atomicity
101    let result = populate_with_transaction(&mut conn, &config);
102    
103    match result {
104        Ok(rows_inserted) => {
105            let duration = start_time.elapsed();
106            println!("Successfully populated table '{}' with {} rows", config.table_name, rows_inserted);
107            println!("Total time: {:.2} seconds", duration.as_secs_f64());
108            println!("Average: {:.0} rows/second", rows_inserted as f64 / duration.as_secs_f64());
109        }
110        Err(e) => {
111            eprintln!("Population failed: {}", e);
112            eprintln!("Attempting to rollback any partial changes...");
113            
114            // Try to clean up any partial data
115            if let Err(cleanup_err) = cleanup_failed_population(&conn, &config.table_name) {
116                eprintln!("Warning: Cleanup failed: {}", cleanup_err);
117                eprintln!("You may need to manually drop the table if it was partially created.");
118            } else {
119                println!("Cleanup completed successfully");
120            }
121            
122            return Err(e);
123        }
124    }
125    
126    // Verify the population was successful
127    verify_population_success(&conn, &config)?;
128    
129    Ok(())
130}
131
132fn validate_database_for_population(db_path: &str) -> Result<()> {
133    if !Path::new(db_path).exists() {
134        anyhow::bail!("Database '{}' does not exist. Create it first with 'init' command.", db_path);
135    }
136    
137    let metadata = std::fs::metadata(db_path)
138        .with_context(|| format!("Cannot read database file '{}'", db_path))?;
139        
140    if metadata.is_dir() {
141        anyhow::bail!("'{}' is a directory, not a database file", db_path);
142    }
143    
144    Ok(())
145}
146
147fn create_connection_with_settings(db_path: &str) -> Result<Connection> {
148    let conn = Connection::open(db_path)
149        .with_context(|| format!("Failed to connect to database: {}", db_path))?;
150    
151    // Configure SQLite for better performance during bulk inserts
152    conn.pragma_update(None, "synchronous", "OFF")
153        .context("Failed to disable synchronous mode")?;
154    
155    conn.pragma_update(None, "journal_mode", "MEMORY")
156        .context("Failed to set journal mode to memory")?;
157    
158    conn.pragma_update(None, "cache_size", "10000")
159        .context("Failed to increase cache size")?;
160    
161    println!("Database configured for bulk insert performance");
162    
163    Ok(conn)
164}
165
166fn check_disk_space_requirements(db_path: &str, config: &Option<PopulationConfig>) -> Result<()> {
167    let default_config = PopulationConfig::default();
168    let config = config.as_ref().unwrap_or(&default_config);
169    // Estimate space needed based on column types and row count
170    let avg_row_size = estimate_row_size(&config.columns);
171    let estimated_size_mb = (avg_row_size * config.row_count) as f64 / (1024.0 * 1024.0);
172    
173    println!("Estimated space needed: ~{:.1} MB", estimated_size_mb);
174    
175    // Try to get available space (this is platform-specific, so we'll make it non-fatal)
176    if let Ok(metadata) = std::fs::metadata(db_path) {
177        if metadata.len() == 0 {
178            eprintln!("Warning: Database file appears to be empty");
179        }
180    }
181    
182    println!("Ensure you have sufficient disk space before proceeding");
183    Ok(())
184}
185
186fn estimate_row_size(columns: &[ColumnConfig]) -> usize {
187    columns.iter().map(|col| match col.data_type {
188        DataType::Integer => 8,
189        DataType::Text => 50, // Average text length
190        DataType::Real => 8,
191        DataType::Boolean => 1,
192        DataType::Date => 8,
193        DataType::Timestamp => 8,
194        DataType::UUID => 36,
195    }).sum()
196}
197
198fn create_table_with_config(conn: &Connection, config: &PopulationConfig) -> Result<()> {
199    let column_defs: Vec<String> = config.columns.iter().map(|col| {
200        let type_str = match col.data_type {
201            DataType::Integer => "INTEGER",
202            DataType::Text => "TEXT",
203            DataType::Real => "REAL",
204            DataType::Boolean => "INTEGER",
205            DataType::Date => "TEXT",
206            DataType::Timestamp => "TEXT",
207            DataType::UUID => "TEXT",
208        };
209        
210        let nullable = if col.nullable { "" } else { " NOT NULL" };
211        format!("{} {}{}", col.name, type_str, nullable)
212    }).collect();
213    
214    let create_table_sql = format!(
215        "CREATE TABLE IF NOT EXISTS {} ({})",
216        config.table_name,
217        column_defs.join(", ")
218    );
219    
220    conn.execute(&create_table_sql, [])
221        .context("Failed to create table. Check database permissions and disk space.")?;
222    
223    // Check if table already has data
224    let existing_count: i64 = conn.query_row(
225        &format!("SELECT COUNT(*) FROM {}", config.table_name),
226        [],
227        |row| row.get(0)
228    ).context("Failed to check existing row count")?;
229    
230    if existing_count > 0 {
231        println!("Table '{}' already contains {} rows", config.table_name, existing_count);
232        println!("Population will add {} more rows", config.row_count);
233    } else {
234        println!("Table '{}' created successfully", config.table_name);
235    }
236    
237    Ok(())
238}
239
240fn populate_with_transaction(conn: &mut Connection, config: &PopulationConfig) -> Result<usize> {
241    let tx = conn.transaction()
242        .context("Failed to begin transaction")?;
243    
244    let placeholders = (0..config.columns.len()).map(|_| "?").collect::<Vec<_>>().join(", ");
245    let column_names: Vec<String> = config.columns.iter().map(|c| c.name.clone()).collect();
246    let column_names_str = column_names.join(", ");
247    let insert_sql = format!(
248        "INSERT INTO {} ({}) VALUES ({})",
249        config.table_name,
250        column_names_str,
251        placeholders
252    );
253    
254    let mut stmt = tx.prepare(&insert_sql)
255        .context("Failed to prepare insert statement")?;
256    
257    let mut rng = if let Some(seed) = config.seed {
258        StdRng::seed_from_u64(seed)
259    } else {
260        StdRng::from_entropy()
261    };
262    
263    let mut rows_inserted = 0;
264    let start_time = Instant::now();
265    let mut last_checkpoint = Instant::now();
266    let checkpoint_interval = Duration::from_secs(30);
267    
268    for batch_start in (0..config.row_count).step_by(config.batch_size) {
269        let batch_end = std::cmp::min(batch_start + config.batch_size, config.row_count);
270        
271        for i in batch_start..batch_end {
272            let values = generate_row_values(&config.columns, i, &mut rng);
273            
274            match stmt.execute(rusqlite::params_from_iter(values)) {
275                Ok(_) => {
276                    rows_inserted += 1;
277                    
278                    // Show progress
279                    if rows_inserted % config.batch_size == 0 {
280                        let elapsed = start_time.elapsed();
281                        let rate = rows_inserted as f64 / elapsed.as_secs_f64();
282                        let eta = if rate > 0.0 {
283                            Duration::from_secs(((config.row_count - rows_inserted) as f64 / rate) as u64)
284                        } else {
285                            Duration::from_secs(0)
286                        };
287                        
288                        println!("Progress: {}/{} rows ({:.1}%) - {:.0} rows/sec - ETA: {:?}", 
289                            rows_inserted, config.row_count, 
290                            (rows_inserted as f64 / config.row_count as f64) * 100.0,
291                            rate,
292                            eta);
293                    }
294                }
295                Err(e) => {
296                    eprintln!("Failed to insert row {}: {}", i + 1, e);
297                    
298                    // Try to continue with a few retries for transient errors
299                    if is_transient_error(&e) && should_retry_insert(rows_inserted) {
300                        eprintln!("Retrying row {}...", i + 1);
301                        std::thread::sleep(Duration::from_millis(10));
302                        continue;
303                    } else {
304                        return Err(e).with_context(|| format!("Failed to insert row {} after retries", i + 1));
305                    }
306                }
307            }
308        }
309        
310        // Create checkpoint if enough time has passed
311        if last_checkpoint.elapsed() >= checkpoint_interval {
312            println!("Creating checkpoint...");
313            tx.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])
314                .context("Failed to create checkpoint")?;
315            last_checkpoint = Instant::now();
316        }
317    }
318    
319    println!("Committing transaction...");
320    drop(stmt); // Release the prepared statement before committing
321    tx.commit()
322        .context("Failed to commit transaction. All changes have been rolled back.")?;
323    
324    Ok(rows_inserted)
325}
326
327fn generate_row_values(columns: &[ColumnConfig], row_index: usize, rng: &mut StdRng) -> Vec<String> {
328    columns.iter().map(|col| {
329        if col.nullable && rng.gen_bool(0.1) { // 10% chance of NULL
330            return "NULL".to_string();
331        }
332        
333        match (&col.data_type, &col.distribution) {
334            (DataType::Integer, DataDistribution::Sequential) => row_index.to_string(),
335            (DataType::Integer, DataDistribution::Uniform) => rng.gen_range(0..1000).to_string(),
336            (DataType::Integer, DataDistribution::Normal { mean, std_dev }) => {
337                let value = rng.gen_range(0.0..1.0);
338                let normal = (value - 0.5) * std_dev + mean;
339                (normal.round() as i64).to_string()
340            },
341            (DataType::Text, DataDistribution::Random) => {
342                format!("text-{}", rng.gen_range(0..1000))
343            },
344            (DataType::Text, DataDistribution::Custom(values)) => {
345                values[rng.gen_range(0..values.len())].clone()
346            },
347            (DataType::Real, DataDistribution::Normal { mean, std_dev }) => {
348                let value = rng.gen_range(0.0..1.0);
349                let normal = (value - 0.5) * std_dev + mean;
350                format!("{:.2}", normal)
351            },
352            (DataType::Boolean, _) => rng.gen_bool(0.5).to_string(),
353            (DataType::Date, _) => {
354                let days = rng.gen_range(0..365);
355                let date = Utc::now() - ChronoDuration::days(days);
356                date.format("%Y-%m-%d").to_string()
357            },
358            (DataType::Timestamp, _) => {
359                let seconds = rng.gen_range(0..86400);
360                let timestamp = Utc::now() - ChronoDuration::seconds(seconds);
361                timestamp.format("%Y-%m-%d %H:%M:%S").to_string()
362            },
363            (DataType::UUID, _) => Uuid::new_v4().to_string(),
364            _ => "".to_string(), // Default case
365        }
366    }).collect()
367}
368
369fn is_transient_error(error: &rusqlite::Error) -> bool {
370    match error {
371        rusqlite::Error::SqliteFailure(err, _) => {
372            matches!(err.code, rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked)
373        }
374        _ => false
375    }
376}
377
378fn should_retry_insert(rows_so_far: usize) -> bool {
379    rows_so_far < 100_000
380}
381
382fn cleanup_failed_population(conn: &Connection, table_name: &str) -> Result<()> {
383    // Check if table exists and has partial data
384    let table_exists: bool = conn.query_row(
385        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
386        [table_name],
387        |row| {
388            let count: i64 = row.get(0)?;
389            Ok(count > 0)
390        }
391    ).context("Failed to check if table exists")?;
392    
393    if table_exists {
394        // Don't drop the table automatically, just report what to do
395        let row_count: i64 = conn.query_row(
396            &format!("SELECT COUNT(*) FROM {}", table_name),
397            [],
398            |row| row.get(0)
399        ).context("Failed to count rows in table")?;
400        
401        if row_count > 0 {
402            println!("Table '{}' contains {} partial rows", table_name, row_count);
403            println!("Run 'DROP TABLE {};' to remove it, or try populating again", table_name);
404        }
405    }
406    
407    Ok(())
408}
409
410fn verify_population_success(conn: &Connection, config: &PopulationConfig) -> Result<()> {
411    let final_count: i64 = conn.query_row(
412        &format!("SELECT COUNT(*) FROM {}", config.table_name),
413        [],
414        |row| row.get(0)
415    ).context("Failed to verify population by counting rows")?;
416    
417    // Get some sample data to verify integrity
418    let sample_row: Option<Vec<String>> = match conn.query_row(
419        &format!("SELECT * FROM {} LIMIT 1", config.table_name),
420        [],
421        |row| {
422            let mut values = Vec::new();
423            for i in 0..config.columns.len() {
424                // Handle different column types
425                let value = match config.columns[i].data_type {
426                    DataType::Integer => row.get::<_, i64>(i)?.to_string(),
427                    DataType::Text => row.get::<_, String>(i)?,
428                    DataType::Real => row.get::<_, f64>(i)?.to_string(),
429                    DataType::Boolean => row.get::<_, bool>(i)?.to_string(),
430                    DataType::Date | DataType::Timestamp | DataType::UUID => row.get::<_, String>(i)?,
431                };
432                values.push(value);
433            }
434            Ok(values)
435        }
436    ) {
437        Ok(data) => Some(data),
438        Err(rusqlite::Error::QueryReturnedNoRows) => None,
439        Err(e) => return Err(e).context("Failed to verify sample data"),
440    };
441    
442    if let Some(values) = sample_row {
443        if values.len() == config.columns.len() {
444            println!("Data integrity verification passed");
445        } else {
446            eprintln!("Warning: Data integrity check failed - sample data doesn't match expected column count");
447        }
448    }
449    
450    println!("Final row count: {} rows in '{}'", final_count, config.table_name);
451    
452    if final_count >= config.row_count as i64 {
453        println!("Population completed successfully!");
454    } else {
455        eprintln!("Warning: Expected at least {} rows but found {}", config.row_count, final_count);
456    }
457    
458    Ok(())
459}