vapor_cli/
repl.rs

1use anyhow::{Context, Result};
2use atty::Stream;
3use rusqlite::Connection;
4use rustyline::DefaultEditor;
5use std::io::{Read, Write};
6use std::path::Path;
7use std::sync::{Arc, Mutex};
8use crate::shell::Shell;
9
10use crate::bookmarks::BookmarkManager;
11use crate::db::list_tables;
12use crate::display::{execute_sql, show_table_schema, show_all_schemas, show_database_info, OutputFormat, QueryOptions};
13use crate::export::export_to_csv;
14use crate::transactions::TransactionManager;
15
16/// Start an interactive SQL REPL (Read-Eval-Print Loop) with enhanced error handling
17pub fn repl_mode(db_path: &str) -> Result<()> {
18    // Convert to absolute path
19    let db_path = std::fs::canonicalize(db_path)
20        .with_context(|| format!("Failed to resolve absolute path for database '{}'", db_path))?
21        .to_str()
22        .ok_or_else(|| anyhow::anyhow!("Database path contains invalid UTF-8 characters"))?
23        .to_string();
24
25    // Validate database exists and is accessible
26    if !Path::new(&db_path).exists() {
27        anyhow::bail!(
28            "Database '{}' does not exist. Use 'vapor-cli init --name {}' to create it.",
29            db_path,
30            db_path.trim_end_matches(".db")
31        );
32    }
33
34    // Verify database integrity before starting REPL
35    verify_database_file(&db_path)?;
36
37    // Connect to the database with retry logic
38    let conn = create_robust_connection(&db_path)?;
39
40    // Handle non-interactive mode (piped input)
41    if !atty::is(Stream::Stdin) {
42        return handle_non_interactive_mode(&conn);
43    }
44
45    println!("Connected to database: {}", db_path);
46    println!("REPL with timing, bookmarks, and transaction support");
47    print_help_summary();
48
49    // Initialize REPL components with error handling
50    let mut rl = match DefaultEditor::new() {
51        Ok(editor) => editor,
52        Err(e) => {
53            eprintln!("Warning: Could not initialize readline editor: {}", e);
54            eprintln!("   Falling back to basic input mode.");
55            return handle_basic_repl_mode(&conn);
56        }
57    };
58
59    // Load command history if available
60    let history_path = Path::new(".vapor_history");
61    if history_path.exists() {
62        if let Err(e) = rl.load_history(history_path) {
63            eprintln!("Warning: Could not load command history: {}", e);
64        }
65    }
66
67    let mut multi_line_input = String::new();
68    let last_select_query = Arc::new(Mutex::new(String::new()));
69    let bookmarks = Arc::new(Mutex::new(BookmarkManager::new()
70        .with_context(|| "Failed to initialize bookmarks")?));
71    let transaction_manager = TransactionManager::new();
72    let mut query_options = QueryOptions::default();
73
74    loop {
75        let prompt = get_prompt(&multi_line_input, &transaction_manager);
76        
77        let readline = rl.readline(prompt);
78        match readline {
79            Ok(line) => {
80                let line = line.trim();
81                if line.is_empty() && multi_line_input.is_empty() {
82                    continue;
83                }
84
85                // Handle multi-line input
86                let command_to_execute = handle_multi_line_input(&mut multi_line_input, line);
87
88                                if let Some(command) = command_to_execute {
89                     // Add to history before execution
90                     if let Err(e) = rl.add_history_entry(&command) {
91                         eprintln!("Warning: Could not add to history: {}", e);
92                     }
93
94                     // Check if it's a special command first
95                     if let Err(_e) = handle_special_commands(
96                         &command,
97                         &conn,
98                         &db_path,
99                         &bookmarks,    
100                         &last_select_query,
101                         &transaction_manager,
102                         &mut query_options,
103                     ) {
104                         // If not a special command, try SQL execution
105                         if let Ok(handled) = transaction_manager.handle_sql_command(&conn, &command) {
106                             if !handled {
107                                 // Regular SQL command
108                                 if let Err(sql_err) = execute_sql(&conn, &command, &query_options) {
109                                     print_command_error(&command, &sql_err);
110                                     
111                                     // For critical errors, offer to reconnect
112                                     if is_critical_error(&sql_err) {
113                                         if offer_reconnection(&db_path) {
114                                             match create_robust_connection(&db_path) {
115                                                 Ok(_new_conn) => {
116                                                     println!("Reconnected successfully!");
117                                                     // Note: Connection replacement would require refactoring
118                                                 }
119                                                 Err(reconnect_err) => {
120                                                     eprintln!("Reconnection failed: {}", reconnect_err);
121                                                     eprintln!("You may need to restart the REPL.");
122                                                 }
123                                             }
124                                         }
125                                     }
126                                 }
127                             }
128                         } else {
129                             println!("Error handling transaction command");
130                         }
131                     }
132                 } else {
133                     // Handle single-line special commands (for incomplete multi-line)
134                     if let Err(e) = handle_single_line_command(
135                         line,
136                         &conn,
137                         &db_path,
138                         &bookmarks,
139                         &last_select_query,
140                         &transaction_manager,
141                         &mut query_options,
142                         &rl,
143                     ) {
144                         print_command_error(line, &e);
145                     }
146                 }
147            }
148            Err(rustyline::error::ReadlineError::Interrupted) => {
149                println!("^C");
150                continue;
151            }
152            Err(rustyline::error::ReadlineError::Eof) => {
153                println!("EOF");
154                break;
155            }
156            Err(err) => {
157                eprintln!("Input error: {}", err);
158                eprintln!("Try typing your command again or type 'help' for assistance.");
159                continue;
160            }
161        }
162    }
163
164    // Cleanup on exit
165    cleanup_repl_session(&conn, &transaction_manager, &mut rl, history_path)?;
166    println!("Goodbye!");
167    Ok(())
168}
169
170fn verify_database_file(db_path: &str) -> Result<()> {
171    let metadata = std::fs::metadata(db_path)
172        .with_context(|| format!("Cannot read database file '{}'", db_path))?;
173    
174    if metadata.is_dir() {
175        anyhow::bail!("'{}' is a directory, not a database file", db_path);
176    }
177    
178    if metadata.len() == 0 {
179        eprintln!("Warning: Database file '{}' is empty", db_path);
180    }
181    
182    Ok(())
183}
184
185fn create_robust_connection(db_path: &str) -> Result<Connection> {
186    let mut last_error = None;
187    let max_retries = 3;
188    
189    for attempt in 1..=max_retries {
190        match Connection::open(db_path) {
191            Ok(conn) => {
192                if attempt > 1 {
193                    println!("Connection succeeded on attempt {}", attempt);
194                }
195                return Ok(conn);
196            }
197            Err(e) => {
198                last_error = Some(e);
199                if attempt < max_retries {
200                    println!("Connection attempt {} failed, retrying...", attempt);
201                    std::thread::sleep(std::time::Duration::from_millis(100 * attempt as u64));
202                }
203            }
204        }
205    }
206    
207    Err(last_error.unwrap())
208        .with_context(|| format!(
209            "Failed to connect to database '{}' after {} attempts. Database may be locked or corrupted.",
210            db_path, max_retries
211        ))
212}
213
214fn handle_non_interactive_mode(conn: &Connection) -> Result<()> {
215    let mut input = String::new();
216    std::io::stdin().read_to_string(&mut input)?;
217    
218    let options = QueryOptions::default();
219    execute_sql(conn, &input, &options)
220}
221
222fn handle_basic_repl_mode(conn: &Connection) -> Result<()> {
223    let mut buffer = String::with_capacity(1024);  // Pre-allocate buffer with reasonable capacity
224    let options = QueryOptions::default();
225    let stdout = std::io::stdout();
226    let mut stdout_handle = stdout.lock();  // Lock stdout once instead of multiple times
227    
228    loop {
229        stdout_handle.write_all(b"vapor> ")?;
230        stdout_handle.flush()?;
231        
232        buffer.clear();  // Clear buffer without deallocating
233        if std::io::stdin().read_line(&mut buffer)? == 0 {
234            break;
235        }
236        
237        let line = buffer.trim();
238        if line.is_empty() {
239            continue;
240        }
241        
242        if let Err(e) = execute_sql(conn, line, &options) {
243            writeln!(stdout_handle, "Error: {}", e)?;
244        }
245    }
246    
247    Ok(())
248}
249
250fn get_prompt(multi_line_input: &str, transaction_manager: &TransactionManager) -> &'static str {
251    if multi_line_input.is_empty() {
252        if transaction_manager.is_active() {
253            "*> "
254        } else {
255            "> "
256        }
257    } else {
258        "... "
259    }
260}
261
262fn handle_multi_line_input(multi_line_input: &mut String, line: &str) -> Option<String> {
263    if !multi_line_input.is_empty() {
264        multi_line_input.push_str(" ");
265        multi_line_input.push_str(line);
266        if line.ends_with(';') {
267            let command = multi_line_input.trim().to_string();
268            multi_line_input.clear();
269            Some(command)
270        } else {
271            None
272        }
273    } else if line.ends_with(';') || is_complete_command(line) {
274        Some(line.to_string())
275    } else {
276        multi_line_input.push_str(line);
277        None
278    }
279}
280
281fn is_complete_command(line: &str) -> bool {
282    let line_lower = line.to_lowercase();
283    // These commands don't need semicolons
284    matches!(line_lower.as_str(), 
285        "exit" | "quit" | "help" | "tables" | "clear" | "info"
286    ) || line_lower.starts_with("schema") ||
287         line_lower.starts_with(".") ||
288         line_lower.starts_with("begin") ||
289         line_lower.starts_with("commit") ||
290         line_lower.starts_with("rollback") ||
291         line_lower.starts_with("drop")
292}
293
294fn print_help_summary() {
295    println!("Vapor CLI - SQLite Database Management");
296    println!("\nSpecial Commands:");
297    println!("  .help              Show this help message");
298    println!("  .tables            List all tables");
299    println!("  .schema [table]    Show schema for all tables or specific table");
300    println!("  .info             Show database information");
301    println!("  .format [type]    Set output format (table, json, csv)");
302    println!("  .limit [n]        Set row limit (0 for no limit)");
303    println!("  .timing           Enable query timing");
304    println!("  .notiming         Disable query timing");
305    println!("  .clear            Clear screen");
306    println!("  .exit/.quit       Exit REPL");
307    println!("\nSQL Commands:");
308    println!("  Enter any valid SQL command ending with semicolon");
309    println!("  Example: SELECT * FROM users;");
310}
311
312fn print_command_error(command: &str, error: &anyhow::Error) {
313    eprintln!("Error executing command '{}':", command);
314    eprintln!("{}", error);
315}
316
317fn is_critical_error(error: &anyhow::Error) -> bool {
318    let error_msg = error.to_string().to_lowercase();
319    error_msg.contains("database is locked") ||
320    error_msg.contains("connection") ||
321    error_msg.contains("i/o error") ||
322    error_msg.contains("disk")
323}
324
325fn offer_reconnection(db_path: &str) -> bool {
326    print!("Would you like to try reconnecting to '{}'? (y/N): ", db_path);
327    std::io::stdout().flush().unwrap_or(());
328    
329    let mut input = String::new();
330    if std::io::stdin().read_line(&mut input).is_ok() {
331        input.trim().to_lowercase().starts_with('y')
332    } else {
333        false
334    }
335}
336
337fn cleanup_repl_session(
338    conn: &Connection,
339    transaction_manager: &TransactionManager,
340    rl: &mut DefaultEditor,
341    history_path: &Path,
342) -> Result<()> {
343    // Rollback any active transaction
344    if transaction_manager.is_active() {
345        println!("Rolling back active transaction...");
346        transaction_manager.rollback_transaction(conn)?;
347    }
348    
349    // Save command history
350    if let Err(e) = rl.save_history(history_path) {
351        eprintln!("Warning: Could not save command history: {}", e);
352    }
353    
354    Ok(())
355}
356
357fn handle_special_commands(
358    command: &str,
359    conn: &Connection,
360    db_path: &str,
361    bookmarks: &Arc<Mutex<BookmarkManager>>,
362    last_select_query: &Arc<Mutex<String>>,
363    transaction_manager: &TransactionManager,
364    query_options: &mut QueryOptions,
365) -> Result<()> {
366    let command = command.trim();
367    
368    match command {
369        ".help" => {
370            show_help();
371            Ok(())
372        }
373        ".shell" => {
374            let mut shell = Shell::new(db_path);
375            shell.run();
376            Ok(())
377        }
378        ".exit" | ".quit" | "exit" | "quit" => {
379            std::process::exit(0);
380        }
381        ".tables" => {
382            let _ = list_tables(db_path)?;
383            Ok(())
384        }
385        ".clear" => {
386            print!("\x1B[2J\x1B[1;1H");
387            std::io::stdout().flush().context("Failed to flush stdout")?;
388            Ok(())
389        }
390        ".info" => {
391            show_database_info(conn, db_path)?;
392            Ok(())
393        }
394        ".format" => {
395            println!("Current format: {:?}", query_options.format);
396            println!("Available formats: table, json, csv");
397            Ok(())
398        }
399        ".format table" => {
400            query_options.format = OutputFormat::Table;
401            Ok(())
402        }
403        ".format json" => {
404            query_options.format = OutputFormat::Json;
405            Ok(())
406        }
407        ".format csv" => {
408            query_options.format = OutputFormat::Csv;
409            Ok(())
410        }
411        ".limit" => {
412            match query_options.max_rows {
413                None => println!("No row limit set"),
414                Some(n) => println!("Current row limit: {}", n),
415            }
416            Ok(())
417        }
418        ".limit 0" => {
419            query_options.max_rows = None;
420            println!("Row limit removed");
421            Ok(())
422        }
423        limit_cmd if limit_cmd.starts_with(".limit ") => {
424            if let Ok(n) = limit_cmd.split_whitespace().nth(1).unwrap().parse::<usize>() {
425                query_options.max_rows = Some(n);
426                println!("Row limit set to {}", n);
427            } else {
428                println!("Invalid limit value. Use a positive number or 0 for no limit.");
429            }
430            Ok(())
431        }
432        ".timing" => {
433            query_options.show_timing = true;
434            println!("Query timing enabled");
435            Ok(())
436        }
437        ".notiming" => {
438            query_options.show_timing = false;
439            println!("Query timing disabled");
440            Ok(())
441        }
442        ".timing status" => {
443            println!("Query timing: {}", if query_options.show_timing { "on" } else { "off" });
444            Ok(())
445        }
446        ".export" => {
447            let parts: Vec<&str> = command.split_whitespace().collect();
448            if parts.len() > 1 {
449                let filename = parts[1];
450                let query = last_select_query.lock().unwrap().clone();
451                if query.is_empty() {
452                    println!("No SELECT query has been executed yet.");
453                } else {
454                    export_to_csv(conn, &query, filename)?;
455                }
456            } else {
457                println!("Usage: .export FILENAME");
458            }
459            Ok(())
460        }
461        ".import" => {
462            let parts: Vec<&str> = command.split_whitespace().collect();
463            if parts.len() >= 3 {
464                let filename = parts[1];
465                let table_name = parts[2];
466                import_csv_to_table(conn, filename, table_name)?;
467            } else {
468                println!("Usage: .import CSV_FILENAME TABLE_NAME");
469                println!("Example: .import data.csv employees");
470            }
471            Ok(())
472        }
473        ".bookmark" => {
474            handle_bookmark_command(command, bookmarks, last_select_query, conn, query_options)?;
475            Ok(())
476        }
477        schema_cmd if schema_cmd.starts_with(".schema") => {
478            let parts: Vec<&str> = schema_cmd.split_whitespace().collect();
479            if parts.len() == 1 {
480                // Just ".schema" - show all schemas
481                show_all_schemas(conn)?;
482            } else if parts.len() == 2 {
483                // ".schema table_name" - show specific table schema
484                show_table_schema(conn, parts[1])?;
485            } else {
486                println!("Usage: .schema [table_name]");
487            }
488            Ok(())
489        }
490        ".status" => {
491            transaction_manager.show_status();
492            Ok(())
493        }
494        _ => {
495            // Handle DROP commands
496            if command.to_lowercase().starts_with("drop") {
497                let parts: Vec<&str> = command.split_whitespace().collect();
498                if parts.len() < 2 {
499                    println!("Usage: DROP TABLE table_name; or DROP table_name;");
500                    Ok(())
501                } else {
502                    let table_name = if parts[1].to_lowercase() == "table" {
503                        if parts.len() < 3 {
504                            println!("Usage: DROP TABLE table_name;");
505                            return Ok(());
506                        }
507                        parts[2].trim_end_matches(';')
508                    } else {
509                        parts[1].trim_end_matches(';')
510                    };
511                    
512                    // Verify table exists before dropping
513                    if !check_table_exists_repl(conn, table_name)? {
514                        println!("Table '{}' does not exist", table_name);
515                        Ok(())
516                    } else {
517                        // Execute the DROP command
518                        conn.execute(&format!("DROP TABLE {}", table_name), [])
519                            .with_context(|| format!("Failed to drop table '{}'", table_name))?;
520                        
521                        println!("Table '{}' dropped successfully", table_name);
522                        Ok(())
523                    }
524                }
525            } else {
526                // Execute as SQL command
527                execute_sql(conn, command, query_options)?;
528                Ok(())
529            }
530        }
531    }
532}
533
534fn handle_single_line_command(
535    line: &str,
536    conn: &Connection,
537    db_path: &str,
538    _bookmarks: &Arc<Mutex<BookmarkManager>>,
539    _last_select_query: &Arc<Mutex<String>>,
540    _transaction_manager: &TransactionManager,
541    query_options: &mut QueryOptions,
542    _rl: &DefaultEditor,
543) -> Result<()> {
544    match line.trim() {
545        ".help" => print_help_summary(),
546        ".tables" => { let _ = list_tables(db_path)?; },
547        ".schema" => {
548            if line.contains(' ') {
549                let table_name = line.split_whitespace().nth(1).unwrap();
550                show_table_schema(conn, table_name)?;
551            } else {
552                show_all_schemas(conn)?;
553            }
554        }
555        ".info" => show_database_info(conn, db_path)?,
556        ".format" => {
557            println!("Current format: {:?}", query_options.format);
558            println!("Available formats: table, json, csv");
559        }
560        ".format table" => query_options.format = OutputFormat::Table,
561        ".format json" => query_options.format = OutputFormat::Json,
562        ".format csv" => query_options.format = OutputFormat::Csv,
563        ".limit" => {
564            match query_options.max_rows {
565                None => println!("No row limit set"),
566                Some(n) => println!("Current row limit: {}", n),
567            }
568        }
569        ".limit 0" => {
570            query_options.max_rows = None;
571            println!("Row limit removed");
572        }
573        limit_cmd if limit_cmd.starts_with(".limit ") => {
574            if let Ok(n) = limit_cmd.split_whitespace().nth(1).unwrap().parse::<usize>() {
575                query_options.max_rows = Some(n);
576                println!("Row limit set to {}", n);
577            } else {
578                println!("Invalid limit value. Use a positive number or 0 for no limit.");
579            }
580        }
581        ".timing" => {
582            query_options.show_timing = true;
583            println!("Query timing enabled");
584        }
585        ".notiming" => {
586            query_options.show_timing = false;
587            println!("Query timing disabled");
588        }
589        ".timing status" => {
590            println!("Query timing: {}", if query_options.show_timing { "on" } else { "off" });
591        }
592        ".clear" => {
593            print!("\x1B[2J\x1B[1;1H");
594        }
595        ".exit" | ".quit" | "exit" | "quit" => {
596            std::process::exit(0);
597        }
598        _ => {
599            // Execute as SQL command
600            execute_sql(conn, line, query_options)?;
601        }
602    }
603    Ok(())
604}
605
606fn handle_bookmark_command(
607    line: &str,
608    bookmarks: &Arc<Mutex<BookmarkManager>>,
609    last_select_query: &Arc<Mutex<String>>,
610    conn: &Connection,
611    query_options: &QueryOptions,
612) -> Result<()> {
613    let parts: Vec<&str> = line.split_whitespace().collect();
614    if parts.len() < 2 {
615        println!("Usage: .bookmark [save|list|run|show|delete] [args...]");
616        return Ok(());
617    }
618
619    let mut bookmarks = bookmarks.lock().unwrap();
620
621    match parts[1] {
622        "save" => {
623            if parts.len() < 3 {
624                println!("Usage: .bookmark save NAME [DESCRIPTION]");
625                return Ok(());
626            }
627            let name = parts[2].to_string();
628            let description = if parts.len() > 3 {
629                Some(parts[3..].join(" "))
630            } else {
631                None
632            };
633            let query = last_select_query.lock().unwrap().clone();
634            if query.is_empty() {
635                println!("No query to save. Execute a query first.");
636            } else {
637                bookmarks.save_bookmark(name.clone(), query, description)?;
638                println!("Bookmark '{}' saved.", name);
639            }
640        },
641        "list" => {
642            bookmarks.list_bookmarks();
643        },
644        "run" => {
645            if parts.len() < 3 {
646                println!("Usage: .bookmark run NAME");
647                return Ok(());
648            }
649            let name = parts[2];
650            if let Some(bookmark) = bookmarks.get_bookmark(name) {
651                println!("Executing bookmark '{}': {}", name, bookmark.query);
652                execute_sql(conn, &bookmark.query, query_options)?;
653            } else {
654                println!("Bookmark '{}' not found.", name);
655            }
656        },
657        "show" => {
658            if parts.len() < 3 {
659                println!("Usage: .bookmark show NAME");
660                return Ok(());
661            }
662            let name = parts[2];
663            if bookmarks.show_bookmark(name).is_none() {
664                println!("Bookmark '{}' not found.", name);
665            }
666        },
667        "delete" => {
668            if parts.len() < 3 {
669                println!("Usage: .bookmark delete NAME");
670                return Ok(());
671            }
672            let name = parts[2];
673            if bookmarks.delete_bookmark(name)? {
674                println!("Bookmark '{}' deleted.", name);
675            } else {
676                println!("Bookmark '{}' not found.", name);
677            }
678        },
679        _ => {
680            println!("Unknown bookmark command. Use: save, list, run, show, or delete");
681        }
682    }
683    Ok(())
684}
685
686/// Display help information for the REPL
687pub fn show_help() {
688    println!("Enhanced REPL Commands:");
689    println!();
690    println!("SQL Operations:");
691    println!("  SQL statements - Any valid SQL statement ending with semicolon");
692    println!("  begin/commit/rollback - Transaction control");
693    println!();
694    println!("Database Information:");
695    println!("  tables - List all tables in the database");
696    println!("  schema [table_name] - Show schema for a table or all tables");
697    println!("  info - Show database information and statistics");
698    println!();
699    println!("Output Control:");
700    println!("  .format [table|json|csv] - Set output format (default: table)");
701    println!("  .limit [N] - Set row limit, 0 for no limit (default: 1000)");
702    println!("  .timing [on|off] - Toggle query timing (default: on)");
703    println!("  .export FILENAME - Export last SELECT query to CSV file");
704    println!("  .import CSV_FILENAME TABLE_NAME - Import CSV file into table");
705    println!();
706    println!("Bookmarks:");
707    println!("  .bookmark save NAME [DESC] - Save current query as bookmark");
708    println!("  .bookmark list - List all saved bookmarks");
709    println!("  .bookmark run NAME - Execute a saved bookmark");
710    println!("  .bookmark show NAME - Show bookmark details");
711    println!("  .bookmark delete NAME - Delete a bookmark");
712    println!();
713    println!("Session Management:");
714    println!("  .status - Show transaction status");
715    println!("  clear - Clear the screen");
716    println!("  help - Show this help message");
717    println!("  exit/quit - Exit the REPL");
718    println!();
719    println!("Features:");
720    println!("  • Multi-line input support (continue until semicolon)");
721    println!("  • Command history with arrow keys");
722    println!("  • Query timing and result pagination");
723    println!("  • Transaction status in prompt (* indicates active transaction)");
724    println!("  • Multiple output formats (table, JSON, CSV)");
725    println!("  • Query bookmarking system");
726}
727
728/// Import CSV data into a table with comprehensive error handling
729fn import_csv_to_table(conn: &Connection, filename: &str, table_name: &str) -> Result<()> {
730    use std::path::Path;
731    
732    // Validate inputs
733    if filename.trim().is_empty() {
734        anyhow::bail!("CSV filename cannot be empty");
735    }
736    
737    if table_name.trim().is_empty() {
738        anyhow::bail!("Table name cannot be empty");
739    }
740    
741    // Validate table name format
742    if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
743        anyhow::bail!("Table name can only contain letters, numbers, and underscores");
744    }
745    
746    // Check if CSV file exists
747    if !Path::new(filename).exists() {
748        anyhow::bail!("CSV file '{}' does not exist", filename);
749    }
750    
751    // Check file extension
752    if !filename.to_lowercase().ends_with(".csv") {
753        eprintln!("Warning: File '{}' doesn't have .csv extension", filename);
754    }
755    
756    println!("Reading CSV file: {}", filename);
757    
758    // Open and read CSV file
759    let mut csv_reader = csv::Reader::from_path(filename)
760        .with_context(|| format!("Failed to open CSV file '{}'. Check file permissions.", filename))?;
761    
762    // Get headers from CSV
763    let headers = csv_reader.headers()
764        .with_context(|| format!("Failed to read CSV headers from '{}'", filename))?;
765    
766    if headers.is_empty() {
767        anyhow::bail!("CSV file '{}' has no headers", filename);
768    }
769    
770    let column_names: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
771    println!("Found {} columns: {}", column_names.len(), column_names.join(", "));
772    
773    // Check if table exists
774    let table_exists = check_table_exists_repl(conn, table_name)?;
775    
776    if table_exists {
777        println!("Table '{}' already exists", table_name);
778        
779        // Verify table has compatible columns
780        verify_table_compatibility(conn, table_name, &column_names)?;
781    } else {
782        println!("Creating table '{}' with inferred schema", table_name);
783        create_table_from_csv_headers(conn, table_name, &column_names)?;
784    }
785    
786    // Prepare insert statement
787    let placeholders = (0..column_names.len()).map(|_| "?").collect::<Vec<_>>().join(", ");
788    let insert_sql = format!("INSERT INTO {} ({}) VALUES ({})", 
789        table_name, 
790        column_names.join(", "), 
791        placeholders
792    );
793    
794    let mut stmt = conn.prepare(&insert_sql)
795        .with_context(|| format!("Failed to prepare insert statement for table '{}'", table_name))?;
796    
797    // Import data with progress tracking
798    let mut row_count = 0;
799    let mut error_count = 0;
800    let start_time = std::time::Instant::now();
801    
802    println!("Importing data...");
803    
804    // Begin transaction for better performance
805    let tx = conn.unchecked_transaction()
806        .context("Failed to begin transaction for import")?;
807    
808    for (record_num, record_result) in csv_reader.records().enumerate() {
809        match record_result {
810            Ok(record) => {
811                let values: Vec<&str> = record.iter().collect();
812                
813                if values.len() != column_names.len() {
814                    error_count += 1;
815                    eprintln!("Row {}: Expected {} columns, got {} - skipping", 
816                        record_num + 2, column_names.len(), values.len());
817                    
818                    if error_count > 100 {
819                        anyhow::bail!("Too many format errors ({}). Import stopped.", error_count);
820                    }
821                    continue;
822                }
823                
824                match stmt.execute(rusqlite::params_from_iter(values)) {
825                    Ok(_) => {
826                        row_count += 1;
827                        
828                        // Progress indicator
829                        if row_count % 10000 == 0 {
830                            let elapsed = start_time.elapsed();
831                            let rate = row_count as f64 / elapsed.as_secs_f64();
832                            println!("Imported {} rows ({:.0} rows/sec)...", row_count, rate);
833                        }
834                    }
835                    Err(e) => {
836                        error_count += 1;
837                        eprintln!("Row {}: Database error - {}", record_num + 2, e);
838                        
839                        if error_count > 100 {
840                            anyhow::bail!("Too many database errors ({}). Import stopped.", error_count);
841                        }
842                    }
843                }
844            }
845            Err(e) => {
846                error_count += 1;
847                eprintln!("Row {}: CSV parsing error - {}", record_num + 2, e);
848                
849                if error_count > 100 {
850                    anyhow::bail!("Too many parsing errors ({}). Import stopped.", error_count);
851                }
852            }
853        }
854    }
855    
856    // Commit transaction
857    tx.commit()
858        .context("Failed to commit import transaction. All changes have been rolled back.")?;
859    
860    let duration = start_time.elapsed();
861    
862    if error_count > 0 {
863        println!("Import completed with {} error(s)", error_count);
864    }
865    
866    println!("Successfully imported {} rows into table '{}' in {:.2} seconds", 
867        row_count, table_name, duration.as_secs_f64());
868    
869    if row_count > 0 {
870        println!("Average: {:.0} rows/second", row_count as f64 / duration.as_secs_f64());
871        
872        // Show sample of imported data
873        let sample_query = format!("SELECT * FROM {} LIMIT 5", table_name);
874        println!("\nSample of imported data:");
875        let options = QueryOptions::default();
876        if let Err(e) = execute_sql(conn, &sample_query, &options) {
877            eprintln!("Could not show sample data: {}", e);
878        }   
879    }
880    
881    Ok(())
882}
883
884fn check_table_exists_repl(conn: &Connection, table_name: &str) -> Result<bool> {
885    let mut stmt = conn.prepare("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1")
886        .context("Failed to prepare table existence check")?;
887    
888    let count: i64 = stmt.query_row(rusqlite::params![table_name], |row| row.get(0))
889        .with_context(|| format!("Failed to check if table '{}' exists", table_name))?;
890    
891    Ok(count > 0)
892}
893
894fn verify_table_compatibility(conn: &Connection, table_name: &str, csv_columns: &[String]) -> Result<()> {
895    // Get table schema
896    let mut stmt = conn.prepare("PRAGMA table_info(?1)")
897        .context("Failed to prepare table info query")?;
898    
899    let table_columns: Result<Vec<String>, _> = stmt.query_map(rusqlite::params![table_name], |row| {
900        let name: String = row.get(1)?;
901        Ok(name)
902    })?.collect();
903    
904    let table_columns = table_columns
905        .with_context(|| format!("Failed to get column information for table '{}'", table_name))?;
906    
907    // Check if CSV columns match table columns
908    let missing_in_table: Vec<String> = csv_columns.iter()
909        .filter(|&col| !table_columns.contains(col))
910        .cloned()
911        .collect();
912    
913    let missing_in_csv: Vec<&String> = table_columns.iter()
914        .filter(|&col| !csv_columns.contains(col))
915        .collect();
916    
917    if !missing_in_table.is_empty() {
918        eprintln!("Warning: CSV has columns not in table: {}", missing_in_table.join(", "));
919    }
920    
921    if !missing_in_csv.is_empty() {
922        let missing_csv_strs: Vec<String> = missing_in_csv.iter().map(|s| s.to_string()).collect();
923        eprintln!("Warning: Table has columns not in CSV: {}", missing_csv_strs.join(", "));
924    }
925    
926    Ok(())
927}
928
929fn create_table_from_csv_headers(conn: &Connection, table_name: &str, column_names: &[String]) -> Result<()> {
930    // Create table with TEXT columns (SQLite will handle type conversions)
931    let column_defs: Vec<String> = column_names.iter()
932        .map(|name| format!("{} TEXT", name))
933        .collect();
934    
935    let create_sql = format!("CREATE TABLE {} ({})", table_name, column_defs.join(", "));
936    
937    conn.execute(&create_sql, [])
938        .with_context(|| format!("Failed to create table '{}' for CSV import", table_name))?;
939    
940    println!("Created table '{}' with {} columns", table_name, column_names.len());
941    
942    Ok(())
943}