1use anyhow::{Context, Result};
2use atty::Stream;
3use rusqlite::Connection;
4use rustyline::DefaultEditor;
5use std::io::{Read, Write};
6use std::path::Path;
7use std::sync::{Arc, Mutex};
8use crate::shell::Shell;
9
10use crate::bookmarks::BookmarkManager;
11use crate::db::list_tables;
12use crate::display::{execute_sql, show_table_schema, show_all_schemas, show_database_info, OutputFormat, QueryOptions};
13use crate::export::export_to_csv;
14use crate::transactions::TransactionManager;
15
16pub fn repl_mode(db_path: &str) -> Result<()> {
18 let db_path = std::fs::canonicalize(db_path)
20 .with_context(|| format!("Failed to resolve absolute path for database '{}'", db_path))?
21 .to_str()
22 .ok_or_else(|| anyhow::anyhow!("Database path contains invalid UTF-8 characters"))?
23 .to_string();
24
25 if !Path::new(&db_path).exists() {
27 anyhow::bail!(
28 "Database '{}' does not exist. Use 'vapor-cli init --name {}' to create it.",
29 db_path,
30 db_path.trim_end_matches(".db")
31 );
32 }
33
34 verify_database_file(&db_path)?;
36
37 let conn = create_robust_connection(&db_path)?;
39
40 if !atty::is(Stream::Stdin) {
42 return handle_non_interactive_mode(&conn);
43 }
44
45 println!("Connected to database: {}", db_path);
46 println!("REPL with timing, bookmarks, and transaction support");
47 print_help_summary();
48
49 let mut rl = match DefaultEditor::new() {
51 Ok(editor) => editor,
52 Err(e) => {
53 eprintln!("Warning: Could not initialize readline editor: {}", e);
54 eprintln!(" Falling back to basic input mode.");
55 return handle_basic_repl_mode(&conn);
56 }
57 };
58
59 let history_path = Path::new(".vapor_history");
61 if history_path.exists() {
62 if let Err(e) = rl.load_history(history_path) {
63 eprintln!("Warning: Could not load command history: {}", e);
64 }
65 }
66
67 let mut multi_line_input = String::new();
68 let last_select_query = Arc::new(Mutex::new(String::new()));
69 let bookmarks = Arc::new(Mutex::new(BookmarkManager::new()
70 .with_context(|| "Failed to initialize bookmarks")?));
71 let transaction_manager = TransactionManager::new();
72 let mut query_options = QueryOptions::default();
73
74 loop {
75 let prompt = get_prompt(&multi_line_input, &transaction_manager);
76
77 let readline = rl.readline(prompt);
78 match readline {
79 Ok(line) => {
80 let line = line.trim();
81 if line.is_empty() && multi_line_input.is_empty() {
82 continue;
83 }
84
85 let command_to_execute = handle_multi_line_input(&mut multi_line_input, line);
87
88 if let Some(command) = command_to_execute {
89 if let Err(e) = rl.add_history_entry(&command) {
91 eprintln!("Warning: Could not add to history: {}", e);
92 }
93
94 if let Err(_e) = handle_special_commands(
96 &command,
97 &conn,
98 &db_path,
99 &bookmarks,
100 &last_select_query,
101 &transaction_manager,
102 &mut query_options,
103 ) {
104 if let Ok(handled) = transaction_manager.handle_sql_command(&conn, &command) {
106 if !handled {
107 if let Err(sql_err) = execute_sql(&conn, &command, &query_options) {
109 print_command_error(&command, &sql_err);
110
111 if is_critical_error(&sql_err) {
113 if offer_reconnection(&db_path) {
114 match create_robust_connection(&db_path) {
115 Ok(_new_conn) => {
116 println!("Reconnected successfully!");
117 }
119 Err(reconnect_err) => {
120 eprintln!("Reconnection failed: {}", reconnect_err);
121 eprintln!("You may need to restart the REPL.");
122 }
123 }
124 }
125 }
126 }
127 }
128 } else {
129 println!("Error handling transaction command");
130 }
131 }
132 } else {
133 if let Err(e) = handle_single_line_command(
135 line,
136 &conn,
137 &db_path,
138 &bookmarks,
139 &last_select_query,
140 &transaction_manager,
141 &mut query_options,
142 &rl,
143 ) {
144 print_command_error(line, &e);
145 }
146 }
147 }
148 Err(rustyline::error::ReadlineError::Interrupted) => {
149 println!("^C");
150 continue;
151 }
152 Err(rustyline::error::ReadlineError::Eof) => {
153 println!("EOF");
154 break;
155 }
156 Err(err) => {
157 eprintln!("Input error: {}", err);
158 eprintln!("Try typing your command again or type 'help' for assistance.");
159 continue;
160 }
161 }
162 }
163
164 cleanup_repl_session(&conn, &transaction_manager, &mut rl, history_path)?;
166 println!("Goodbye!");
167 Ok(())
168}
169
170fn verify_database_file(db_path: &str) -> Result<()> {
171 let metadata = std::fs::metadata(db_path)
172 .with_context(|| format!("Cannot read database file '{}'", db_path))?;
173
174 if metadata.is_dir() {
175 anyhow::bail!("'{}' is a directory, not a database file", db_path);
176 }
177
178 if metadata.len() == 0 {
179 eprintln!("Warning: Database file '{}' is empty", db_path);
180 }
181
182 Ok(())
183}
184
185fn create_robust_connection(db_path: &str) -> Result<Connection> {
186 let mut last_error = None;
187 let max_retries = 3;
188
189 for attempt in 1..=max_retries {
190 match Connection::open(db_path) {
191 Ok(conn) => {
192 if attempt > 1 {
193 println!("Connection succeeded on attempt {}", attempt);
194 }
195 return Ok(conn);
196 }
197 Err(e) => {
198 last_error = Some(e);
199 if attempt < max_retries {
200 println!("Connection attempt {} failed, retrying...", attempt);
201 std::thread::sleep(std::time::Duration::from_millis(100 * attempt as u64));
202 }
203 }
204 }
205 }
206
207 Err(last_error.unwrap())
208 .with_context(|| format!(
209 "Failed to connect to database '{}' after {} attempts. Database may be locked or corrupted.",
210 db_path, max_retries
211 ))
212}
213
214fn handle_non_interactive_mode(conn: &Connection) -> Result<()> {
215 let mut input = String::new();
216 std::io::stdin().read_to_string(&mut input)?;
217
218 let options = QueryOptions::default();
219 execute_sql(conn, &input, &options)
220}
221
222fn handle_basic_repl_mode(conn: &Connection) -> Result<()> {
223 let mut buffer = String::with_capacity(1024); let options = QueryOptions::default();
225 let stdout = std::io::stdout();
226 let mut stdout_handle = stdout.lock(); loop {
229 stdout_handle.write_all(b"vapor> ")?;
230 stdout_handle.flush()?;
231
232 buffer.clear(); if std::io::stdin().read_line(&mut buffer)? == 0 {
234 break;
235 }
236
237 let line = buffer.trim();
238 if line.is_empty() {
239 continue;
240 }
241
242 if let Err(e) = execute_sql(conn, line, &options) {
243 writeln!(stdout_handle, "Error: {}", e)?;
244 }
245 }
246
247 Ok(())
248}
249
250fn get_prompt(multi_line_input: &str, transaction_manager: &TransactionManager) -> &'static str {
251 if multi_line_input.is_empty() {
252 if transaction_manager.is_active() {
253 "*> "
254 } else {
255 "> "
256 }
257 } else {
258 "... "
259 }
260}
261
262fn handle_multi_line_input(multi_line_input: &mut String, line: &str) -> Option<String> {
263 if !multi_line_input.is_empty() {
264 multi_line_input.push_str(" ");
265 multi_line_input.push_str(line);
266 if line.ends_with(';') {
267 let command = multi_line_input.trim().to_string();
268 multi_line_input.clear();
269 Some(command)
270 } else {
271 None
272 }
273 } else if line.ends_with(';') || is_complete_command(line) {
274 Some(line.to_string())
275 } else {
276 multi_line_input.push_str(line);
277 None
278 }
279}
280
281fn is_complete_command(line: &str) -> bool {
282 let line_lower = line.to_lowercase();
283 matches!(line_lower.as_str(),
285 "exit" | "quit" | "help" | "tables" | "clear" | "info"
286 ) || line_lower.starts_with("schema") ||
287 line_lower.starts_with(".") ||
288 line_lower.starts_with("begin") ||
289 line_lower.starts_with("commit") ||
290 line_lower.starts_with("rollback") ||
291 line_lower.starts_with("drop")
292}
293
294fn print_help_summary() {
295 println!("Vapor CLI - SQLite Database Management");
296 println!("\nSpecial Commands:");
297 println!(" .help Show this help message");
298 println!(" .tables List all tables");
299 println!(" .schema [table] Show schema for all tables or specific table");
300 println!(" .info Show database information");
301 println!(" .format [type] Set output format (table, json, csv)");
302 println!(" .limit [n] Set row limit (0 for no limit)");
303 println!(" .timing Enable query timing");
304 println!(" .notiming Disable query timing");
305 println!(" .clear Clear screen");
306 println!(" .exit/.quit Exit REPL");
307 println!("\nSQL Commands:");
308 println!(" Enter any valid SQL command ending with semicolon");
309 println!(" Example: SELECT * FROM users;");
310}
311
312fn print_command_error(command: &str, error: &anyhow::Error) {
313 eprintln!("Error executing command '{}':", command);
314 eprintln!("{}", error);
315}
316
317fn is_critical_error(error: &anyhow::Error) -> bool {
318 let error_msg = error.to_string().to_lowercase();
319 error_msg.contains("database is locked") ||
320 error_msg.contains("connection") ||
321 error_msg.contains("i/o error") ||
322 error_msg.contains("disk")
323}
324
325fn offer_reconnection(db_path: &str) -> bool {
326 print!("Would you like to try reconnecting to '{}'? (y/N): ", db_path);
327 std::io::stdout().flush().unwrap_or(());
328
329 let mut input = String::new();
330 if std::io::stdin().read_line(&mut input).is_ok() {
331 input.trim().to_lowercase().starts_with('y')
332 } else {
333 false
334 }
335}
336
337fn cleanup_repl_session(
338 conn: &Connection,
339 transaction_manager: &TransactionManager,
340 rl: &mut DefaultEditor,
341 history_path: &Path,
342) -> Result<()> {
343 if transaction_manager.is_active() {
345 println!("Rolling back active transaction...");
346 transaction_manager.rollback_transaction(conn)?;
347 }
348
349 if let Err(e) = rl.save_history(history_path) {
351 eprintln!("Warning: Could not save command history: {}", e);
352 }
353
354 Ok(())
355}
356
357fn handle_special_commands(
358 command: &str,
359 conn: &Connection,
360 db_path: &str,
361 bookmarks: &Arc<Mutex<BookmarkManager>>,
362 last_select_query: &Arc<Mutex<String>>,
363 transaction_manager: &TransactionManager,
364 query_options: &mut QueryOptions,
365) -> Result<()> {
366 let command = command.trim();
367
368 match command {
369 ".help" => {
370 show_help();
371 Ok(())
372 }
373 ".shell" => {
374 let mut shell = Shell::new(db_path);
375 shell.run();
376 Ok(())
377 }
378 ".exit" | ".quit" | "exit" | "quit" => {
379 std::process::exit(0);
380 }
381 ".tables" => {
382 let _ = list_tables(db_path)?;
383 Ok(())
384 }
385 ".clear" => {
386 print!("\x1B[2J\x1B[1;1H");
387 std::io::stdout().flush().context("Failed to flush stdout")?;
388 Ok(())
389 }
390 ".info" => {
391 show_database_info(conn, db_path)?;
392 Ok(())
393 }
394 ".format" => {
395 println!("Current format: {:?}", query_options.format);
396 println!("Available formats: table, json, csv");
397 Ok(())
398 }
399 ".format table" => {
400 query_options.format = OutputFormat::Table;
401 Ok(())
402 }
403 ".format json" => {
404 query_options.format = OutputFormat::Json;
405 Ok(())
406 }
407 ".format csv" => {
408 query_options.format = OutputFormat::Csv;
409 Ok(())
410 }
411 ".limit" => {
412 match query_options.max_rows {
413 None => println!("No row limit set"),
414 Some(n) => println!("Current row limit: {}", n),
415 }
416 Ok(())
417 }
418 ".limit 0" => {
419 query_options.max_rows = None;
420 println!("Row limit removed");
421 Ok(())
422 }
423 limit_cmd if limit_cmd.starts_with(".limit ") => {
424 if let Ok(n) = limit_cmd.split_whitespace().nth(1).unwrap().parse::<usize>() {
425 query_options.max_rows = Some(n);
426 println!("Row limit set to {}", n);
427 } else {
428 println!("Invalid limit value. Use a positive number or 0 for no limit.");
429 }
430 Ok(())
431 }
432 ".timing" => {
433 query_options.show_timing = true;
434 println!("Query timing enabled");
435 Ok(())
436 }
437 ".notiming" => {
438 query_options.show_timing = false;
439 println!("Query timing disabled");
440 Ok(())
441 }
442 ".timing status" => {
443 println!("Query timing: {}", if query_options.show_timing { "on" } else { "off" });
444 Ok(())
445 }
446 ".export" => {
447 let parts: Vec<&str> = command.split_whitespace().collect();
448 if parts.len() > 1 {
449 let filename = parts[1];
450 let query = last_select_query.lock().unwrap().clone();
451 if query.is_empty() {
452 println!("No SELECT query has been executed yet.");
453 } else {
454 export_to_csv(conn, &query, filename)?;
455 }
456 } else {
457 println!("Usage: .export FILENAME");
458 }
459 Ok(())
460 }
461 ".import" => {
462 let parts: Vec<&str> = command.split_whitespace().collect();
463 if parts.len() >= 3 {
464 let filename = parts[1];
465 let table_name = parts[2];
466 import_csv_to_table(conn, filename, table_name)?;
467 } else {
468 println!("Usage: .import CSV_FILENAME TABLE_NAME");
469 println!("Example: .import data.csv employees");
470 }
471 Ok(())
472 }
473 ".bookmark" => {
474 handle_bookmark_command(command, bookmarks, last_select_query, conn, query_options)?;
475 Ok(())
476 }
477 schema_cmd if schema_cmd.starts_with(".schema") => {
478 let parts: Vec<&str> = schema_cmd.split_whitespace().collect();
479 if parts.len() == 1 {
480 show_all_schemas(conn)?;
482 } else if parts.len() == 2 {
483 show_table_schema(conn, parts[1])?;
485 } else {
486 println!("Usage: .schema [table_name]");
487 }
488 Ok(())
489 }
490 ".status" => {
491 transaction_manager.show_status();
492 Ok(())
493 }
494 _ => {
495 if command.to_lowercase().starts_with("drop") {
497 let parts: Vec<&str> = command.split_whitespace().collect();
498 if parts.len() < 2 {
499 println!("Usage: DROP TABLE table_name; or DROP table_name;");
500 Ok(())
501 } else {
502 let table_name = if parts[1].to_lowercase() == "table" {
503 if parts.len() < 3 {
504 println!("Usage: DROP TABLE table_name;");
505 return Ok(());
506 }
507 parts[2].trim_end_matches(';')
508 } else {
509 parts[1].trim_end_matches(';')
510 };
511
512 if !check_table_exists_repl(conn, table_name)? {
514 println!("Table '{}' does not exist", table_name);
515 Ok(())
516 } else {
517 conn.execute(&format!("DROP TABLE {}", table_name), [])
519 .with_context(|| format!("Failed to drop table '{}'", table_name))?;
520
521 println!("Table '{}' dropped successfully", table_name);
522 Ok(())
523 }
524 }
525 } else {
526 execute_sql(conn, command, query_options)?;
528 Ok(())
529 }
530 }
531 }
532}
533
534fn handle_single_line_command(
535 line: &str,
536 conn: &Connection,
537 db_path: &str,
538 _bookmarks: &Arc<Mutex<BookmarkManager>>,
539 _last_select_query: &Arc<Mutex<String>>,
540 _transaction_manager: &TransactionManager,
541 query_options: &mut QueryOptions,
542 _rl: &DefaultEditor,
543) -> Result<()> {
544 match line.trim() {
545 ".help" => print_help_summary(),
546 ".tables" => { let _ = list_tables(db_path)?; },
547 ".schema" => {
548 if line.contains(' ') {
549 let table_name = line.split_whitespace().nth(1).unwrap();
550 show_table_schema(conn, table_name)?;
551 } else {
552 show_all_schemas(conn)?;
553 }
554 }
555 ".info" => show_database_info(conn, db_path)?,
556 ".format" => {
557 println!("Current format: {:?}", query_options.format);
558 println!("Available formats: table, json, csv");
559 }
560 ".format table" => query_options.format = OutputFormat::Table,
561 ".format json" => query_options.format = OutputFormat::Json,
562 ".format csv" => query_options.format = OutputFormat::Csv,
563 ".limit" => {
564 match query_options.max_rows {
565 None => println!("No row limit set"),
566 Some(n) => println!("Current row limit: {}", n),
567 }
568 }
569 ".limit 0" => {
570 query_options.max_rows = None;
571 println!("Row limit removed");
572 }
573 limit_cmd if limit_cmd.starts_with(".limit ") => {
574 if let Ok(n) = limit_cmd.split_whitespace().nth(1).unwrap().parse::<usize>() {
575 query_options.max_rows = Some(n);
576 println!("Row limit set to {}", n);
577 } else {
578 println!("Invalid limit value. Use a positive number or 0 for no limit.");
579 }
580 }
581 ".timing" => {
582 query_options.show_timing = true;
583 println!("Query timing enabled");
584 }
585 ".notiming" => {
586 query_options.show_timing = false;
587 println!("Query timing disabled");
588 }
589 ".timing status" => {
590 println!("Query timing: {}", if query_options.show_timing { "on" } else { "off" });
591 }
592 ".clear" => {
593 print!("\x1B[2J\x1B[1;1H");
594 }
595 ".exit" | ".quit" | "exit" | "quit" => {
596 std::process::exit(0);
597 }
598 _ => {
599 execute_sql(conn, line, query_options)?;
601 }
602 }
603 Ok(())
604}
605
606fn handle_bookmark_command(
607 line: &str,
608 bookmarks: &Arc<Mutex<BookmarkManager>>,
609 last_select_query: &Arc<Mutex<String>>,
610 conn: &Connection,
611 query_options: &QueryOptions,
612) -> Result<()> {
613 let parts: Vec<&str> = line.split_whitespace().collect();
614 if parts.len() < 2 {
615 println!("Usage: .bookmark [save|list|run|show|delete] [args...]");
616 return Ok(());
617 }
618
619 let mut bookmarks = bookmarks.lock().unwrap();
620
621 match parts[1] {
622 "save" => {
623 if parts.len() < 3 {
624 println!("Usage: .bookmark save NAME [DESCRIPTION]");
625 return Ok(());
626 }
627 let name = parts[2].to_string();
628 let description = if parts.len() > 3 {
629 Some(parts[3..].join(" "))
630 } else {
631 None
632 };
633 let query = last_select_query.lock().unwrap().clone();
634 if query.is_empty() {
635 println!("No query to save. Execute a query first.");
636 } else {
637 bookmarks.save_bookmark(name.clone(), query, description)?;
638 println!("Bookmark '{}' saved.", name);
639 }
640 },
641 "list" => {
642 bookmarks.list_bookmarks();
643 },
644 "run" => {
645 if parts.len() < 3 {
646 println!("Usage: .bookmark run NAME");
647 return Ok(());
648 }
649 let name = parts[2];
650 if let Some(bookmark) = bookmarks.get_bookmark(name) {
651 println!("Executing bookmark '{}': {}", name, bookmark.query);
652 execute_sql(conn, &bookmark.query, query_options)?;
653 } else {
654 println!("Bookmark '{}' not found.", name);
655 }
656 },
657 "show" => {
658 if parts.len() < 3 {
659 println!("Usage: .bookmark show NAME");
660 return Ok(());
661 }
662 let name = parts[2];
663 if bookmarks.show_bookmark(name).is_none() {
664 println!("Bookmark '{}' not found.", name);
665 }
666 },
667 "delete" => {
668 if parts.len() < 3 {
669 println!("Usage: .bookmark delete NAME");
670 return Ok(());
671 }
672 let name = parts[2];
673 if bookmarks.delete_bookmark(name)? {
674 println!("Bookmark '{}' deleted.", name);
675 } else {
676 println!("Bookmark '{}' not found.", name);
677 }
678 },
679 _ => {
680 println!("Unknown bookmark command. Use: save, list, run, show, or delete");
681 }
682 }
683 Ok(())
684}
685
686pub fn show_help() {
688 println!("Enhanced REPL Commands:");
689 println!();
690 println!("SQL Operations:");
691 println!(" SQL statements - Any valid SQL statement ending with semicolon");
692 println!(" begin/commit/rollback - Transaction control");
693 println!();
694 println!("Database Information:");
695 println!(" tables - List all tables in the database");
696 println!(" schema [table_name] - Show schema for a table or all tables");
697 println!(" info - Show database information and statistics");
698 println!();
699 println!("Output Control:");
700 println!(" .format [table|json|csv] - Set output format (default: table)");
701 println!(" .limit [N] - Set row limit, 0 for no limit (default: 1000)");
702 println!(" .timing [on|off] - Toggle query timing (default: on)");
703 println!(" .export FILENAME - Export last SELECT query to CSV file");
704 println!(" .import CSV_FILENAME TABLE_NAME - Import CSV file into table");
705 println!();
706 println!("Bookmarks:");
707 println!(" .bookmark save NAME [DESC] - Save current query as bookmark");
708 println!(" .bookmark list - List all saved bookmarks");
709 println!(" .bookmark run NAME - Execute a saved bookmark");
710 println!(" .bookmark show NAME - Show bookmark details");
711 println!(" .bookmark delete NAME - Delete a bookmark");
712 println!();
713 println!("Session Management:");
714 println!(" .status - Show transaction status");
715 println!(" clear - Clear the screen");
716 println!(" help - Show this help message");
717 println!(" exit/quit - Exit the REPL");
718 println!();
719 println!("Features:");
720 println!(" • Multi-line input support (continue until semicolon)");
721 println!(" • Command history with arrow keys");
722 println!(" • Query timing and result pagination");
723 println!(" • Transaction status in prompt (* indicates active transaction)");
724 println!(" • Multiple output formats (table, JSON, CSV)");
725 println!(" • Query bookmarking system");
726}
727
728fn import_csv_to_table(conn: &Connection, filename: &str, table_name: &str) -> Result<()> {
730 use std::path::Path;
731
732 if filename.trim().is_empty() {
734 anyhow::bail!("CSV filename cannot be empty");
735 }
736
737 if table_name.trim().is_empty() {
738 anyhow::bail!("Table name cannot be empty");
739 }
740
741 if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
743 anyhow::bail!("Table name can only contain letters, numbers, and underscores");
744 }
745
746 if !Path::new(filename).exists() {
748 anyhow::bail!("CSV file '{}' does not exist", filename);
749 }
750
751 if !filename.to_lowercase().ends_with(".csv") {
753 eprintln!("Warning: File '{}' doesn't have .csv extension", filename);
754 }
755
756 println!("Reading CSV file: {}", filename);
757
758 let mut csv_reader = csv::Reader::from_path(filename)
760 .with_context(|| format!("Failed to open CSV file '{}'. Check file permissions.", filename))?;
761
762 let headers = csv_reader.headers()
764 .with_context(|| format!("Failed to read CSV headers from '{}'", filename))?;
765
766 if headers.is_empty() {
767 anyhow::bail!("CSV file '{}' has no headers", filename);
768 }
769
770 let column_names: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
771 println!("Found {} columns: {}", column_names.len(), column_names.join(", "));
772
773 let table_exists = check_table_exists_repl(conn, table_name)?;
775
776 if table_exists {
777 println!("Table '{}' already exists", table_name);
778
779 verify_table_compatibility(conn, table_name, &column_names)?;
781 } else {
782 println!("Creating table '{}' with inferred schema", table_name);
783 create_table_from_csv_headers(conn, table_name, &column_names)?;
784 }
785
786 let placeholders = (0..column_names.len()).map(|_| "?").collect::<Vec<_>>().join(", ");
788 let insert_sql = format!("INSERT INTO {} ({}) VALUES ({})",
789 table_name,
790 column_names.join(", "),
791 placeholders
792 );
793
794 let mut stmt = conn.prepare(&insert_sql)
795 .with_context(|| format!("Failed to prepare insert statement for table '{}'", table_name))?;
796
797 let mut row_count = 0;
799 let mut error_count = 0;
800 let start_time = std::time::Instant::now();
801
802 println!("Importing data...");
803
804 let tx = conn.unchecked_transaction()
806 .context("Failed to begin transaction for import")?;
807
808 for (record_num, record_result) in csv_reader.records().enumerate() {
809 match record_result {
810 Ok(record) => {
811 let values: Vec<&str> = record.iter().collect();
812
813 if values.len() != column_names.len() {
814 error_count += 1;
815 eprintln!("Row {}: Expected {} columns, got {} - skipping",
816 record_num + 2, column_names.len(), values.len());
817
818 if error_count > 100 {
819 anyhow::bail!("Too many format errors ({}). Import stopped.", error_count);
820 }
821 continue;
822 }
823
824 match stmt.execute(rusqlite::params_from_iter(values)) {
825 Ok(_) => {
826 row_count += 1;
827
828 if row_count % 10000 == 0 {
830 let elapsed = start_time.elapsed();
831 let rate = row_count as f64 / elapsed.as_secs_f64();
832 println!("Imported {} rows ({:.0} rows/sec)...", row_count, rate);
833 }
834 }
835 Err(e) => {
836 error_count += 1;
837 eprintln!("Row {}: Database error - {}", record_num + 2, e);
838
839 if error_count > 100 {
840 anyhow::bail!("Too many database errors ({}). Import stopped.", error_count);
841 }
842 }
843 }
844 }
845 Err(e) => {
846 error_count += 1;
847 eprintln!("Row {}: CSV parsing error - {}", record_num + 2, e);
848
849 if error_count > 100 {
850 anyhow::bail!("Too many parsing errors ({}). Import stopped.", error_count);
851 }
852 }
853 }
854 }
855
856 tx.commit()
858 .context("Failed to commit import transaction. All changes have been rolled back.")?;
859
860 let duration = start_time.elapsed();
861
862 if error_count > 0 {
863 println!("Import completed with {} error(s)", error_count);
864 }
865
866 println!("Successfully imported {} rows into table '{}' in {:.2} seconds",
867 row_count, table_name, duration.as_secs_f64());
868
869 if row_count > 0 {
870 println!("Average: {:.0} rows/second", row_count as f64 / duration.as_secs_f64());
871
872 let sample_query = format!("SELECT * FROM {} LIMIT 5", table_name);
874 println!("\nSample of imported data:");
875 let options = QueryOptions::default();
876 if let Err(e) = execute_sql(conn, &sample_query, &options) {
877 eprintln!("Could not show sample data: {}", e);
878 }
879 }
880
881 Ok(())
882}
883
884fn check_table_exists_repl(conn: &Connection, table_name: &str) -> Result<bool> {
885 let mut stmt = conn.prepare("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1")
886 .context("Failed to prepare table existence check")?;
887
888 let count: i64 = stmt.query_row(rusqlite::params![table_name], |row| row.get(0))
889 .with_context(|| format!("Failed to check if table '{}' exists", table_name))?;
890
891 Ok(count > 0)
892}
893
894fn verify_table_compatibility(conn: &Connection, table_name: &str, csv_columns: &[String]) -> Result<()> {
895 let mut stmt = conn.prepare("PRAGMA table_info(?1)")
897 .context("Failed to prepare table info query")?;
898
899 let table_columns: Result<Vec<String>, _> = stmt.query_map(rusqlite::params![table_name], |row| {
900 let name: String = row.get(1)?;
901 Ok(name)
902 })?.collect();
903
904 let table_columns = table_columns
905 .with_context(|| format!("Failed to get column information for table '{}'", table_name))?;
906
907 let missing_in_table: Vec<String> = csv_columns.iter()
909 .filter(|&col| !table_columns.contains(col))
910 .cloned()
911 .collect();
912
913 let missing_in_csv: Vec<&String> = table_columns.iter()
914 .filter(|&col| !csv_columns.contains(col))
915 .collect();
916
917 if !missing_in_table.is_empty() {
918 eprintln!("Warning: CSV has columns not in table: {}", missing_in_table.join(", "));
919 }
920
921 if !missing_in_csv.is_empty() {
922 let missing_csv_strs: Vec<String> = missing_in_csv.iter().map(|s| s.to_string()).collect();
923 eprintln!("Warning: Table has columns not in CSV: {}", missing_csv_strs.join(", "));
924 }
925
926 Ok(())
927}
928
929fn create_table_from_csv_headers(conn: &Connection, table_name: &str, column_names: &[String]) -> Result<()> {
930 let column_defs: Vec<String> = column_names.iter()
932 .map(|name| format!("{} TEXT", name))
933 .collect();
934
935 let create_sql = format!("CREATE TABLE {} ({})", table_name, column_defs.join(", "));
936
937 conn.execute(&create_sql, [])
938 .with_context(|| format!("Failed to create table '{}' for CSV import", table_name))?;
939
940 println!("Created table '{}' with {} columns", table_name, column_names.len());
941
942 Ok(())
943}