Skip to main content

csvdb/commands/
to_sqlite.rs

1use anyhow::{bail, Context, Result};
2use indicatif::{ProgressBar, ProgressStyle};
3use rusqlite::Connection;
4use std::fs;
5use std::io::IsTerminal;
6use std::path::{Path, PathBuf};
7use std::process::Command;
8
9use crate::core::csv::read_table_csv;
10use crate::core::{InputFormat, Schema};
11use crate::{NullMode, OrderMode, TableFilter};
12
13/// Convert any supported format to a SQLite database.
14pub fn to_sqlite(input_path: &Path, force: bool, filter: &TableFilter) -> Result<PathBuf> {
15    let input_format = InputFormat::from_path(input_path)?;
16
17    // For non-csvdb formats, convert to csvdb first in a temp directory
18    let (csvdb_dir, _temp_dir) = match input_format {
19        InputFormat::Csvdb => (input_path.to_path_buf(), None),
20        _ => {
21            let temp_dir = tempfile::tempdir()?;
22            let temp_csvdb = temp_dir.path().join("temp.csvdb");
23            crate::commands::to_csv::to_csv(
24                input_path,
25                OrderMode::Pk,
26                NullMode::Marker,
27                Some(&temp_csvdb),
28                true,
29                filter,
30            )?;
31            (temp_csvdb, Some(temp_dir))
32        }
33    };
34
35    let schema_path = csvdb_dir.join("schema.sql");
36    let schema = Schema::from_schema_sql(&schema_path)?;
37
38    // Determine output path based on input
39    let stem = input_path
40        .file_stem()
41        .and_then(|s| s.to_str())
42        .unwrap_or("database");
43    let stem = stem
44        .strip_suffix(".csvdb")
45        .or_else(|| stem.strip_suffix(".parquetdb"))
46        .unwrap_or(stem);
47
48    let db_name = format!("{}.sqlite", stem);
49    let db_path = input_path
50        .parent()
51        .unwrap_or(Path::new("."))
52        .join(db_name);
53
54    // Check for existing database
55    if db_path.exists() {
56        if !force {
57            bail!(
58                "Output file already exists: {}\nUse --force to overwrite.",
59                db_path.display()
60            );
61        }
62        fs::remove_file(&db_path)?;
63    }
64
65    // Try to use sqlite3 CLI for fast import, fall back to rusqlite if unavailable
66    if sqlite3_available() {
67        to_sqlite_via_cli(&csvdb_dir, &db_path, &schema_path, &schema, filter)
68    } else {
69        to_sqlite_via_rusqlite(&csvdb_dir, &db_path, &schema_path, &schema, filter)
70    }
71}
72
73/// Check if sqlite3 CLI is available
74fn sqlite3_available() -> bool {
75    Command::new("sqlite3")
76        .arg("--version")
77        .output()
78        .map(|o| o.status.success())
79        .unwrap_or(false)
80}
81
82/// Fast import using sqlite3 CLI's .import command
83fn to_sqlite_via_cli(
84    csvdb_dir: &Path,
85    db_path: &Path,
86    schema_path: &Path,
87    schema: &Schema,
88    filter: &TableFilter,
89) -> Result<PathBuf> {
90    use std::io::Write;
91
92    let abs_db_path = if db_path.is_absolute() {
93        db_path.to_path_buf()
94    } else {
95        std::env::current_dir()?.join(db_path)
96    };
97
98    // Build sqlite3 commands
99    let mut commands = String::new();
100
101    // Read and execute schema
102    let abs_schema_path = schema_path.canonicalize()
103        .with_context(|| format!("Failed to get absolute path: {}", schema_path.display()))?;
104    commands.push_str(&format!(".read '{}'\n", abs_schema_path.display()));
105
106    // Import each CSV file (filtered)
107    commands.push_str(".mode csv\n");
108    for table_name in schema.tables.keys() {
109        if !filter.matches(table_name) {
110            continue;
111        }
112        let csv_path = csvdb_dir.join(format!("{}.csv", table_name));
113        if csv_path.exists() {
114            let abs_csv_path = csv_path.canonicalize()
115                .with_context(|| format!("Failed to get absolute path: {}", csv_path.display()))?;
116            // .import with --skip 1 to skip header row
117            commands.push_str(&format!(
118                ".import --skip 1 '{}' {}\n",
119                abs_csv_path.display(),
120                table_name
121            ));
122        }
123    }
124
125    // Convert \N markers to actual NULL values.
126    // sqlite3's .nullvalue does not affect .import, so we fix up after import.
127    for (table_name, table_schema) in &schema.tables {
128        if !filter.matches(table_name) {
129            continue;
130        }
131        for col in &table_schema.columns {
132            commands.push_str(&format!(
133                "UPDATE \"{}\" SET \"{}\" = NULL WHERE \"{}\" = '\\N';\n",
134                table_name, col.name, col.name
135            ));
136        }
137    }
138
139    // Run sqlite3 with commands via stdin
140    let mut child = Command::new("sqlite3")
141        .arg(&abs_db_path)
142        .stdin(std::process::Stdio::piped())
143        .stdout(std::process::Stdio::piped())
144        .stderr(std::process::Stdio::piped())
145        .spawn()
146        .context("Failed to start sqlite3")?;
147
148    // Write commands to stdin
149    if let Some(mut stdin) = child.stdin.take() {
150        stdin.write_all(commands.as_bytes())
151            .context("Failed to write to sqlite3 stdin")?;
152    }
153
154    let output = child.wait_with_output()
155        .context("Failed to run sqlite3")?;
156
157    if !output.status.success() {
158        let stderr = String::from_utf8_lossy(&output.stderr);
159        bail!("sqlite3 import failed: {}", stderr);
160    }
161
162    Ok(db_path.to_path_buf())
163}
164
165/// Fallback import using rusqlite (slower but always available)
166fn to_sqlite_via_rusqlite(
167    csvdb_dir: &Path,
168    db_path: &Path,
169    schema_path: &Path,
170    schema: &Schema,
171    filter: &TableFilter,
172) -> Result<PathBuf> {
173    let conn = Connection::open(db_path)
174        .with_context(|| format!("Failed to create database: {}", db_path.display()))?;
175
176    // Create tables from schema
177    let schema_sql = fs::read_to_string(schema_path)?;
178    for stmt in schema_sql.split(';') {
179        let stmt = stmt.trim();
180        if !stmt.is_empty() {
181            conn.execute(stmt, [])
182                .with_context(|| format!("Failed to execute: {}", stmt))?;
183        }
184    }
185
186    // Import data from CSV files (within a transaction for performance)
187    let pb = if std::io::stderr().is_terminal() {
188        let pb = ProgressBar::new(schema.tables.len() as u64);
189        pb.set_style(
190            ProgressStyle::default_bar()
191                .template("[{bar:40}] {pos}/{len} {msg}")
192                .unwrap(),
193        );
194        pb
195    } else {
196        ProgressBar::hidden()
197    };
198    conn.execute("BEGIN TRANSACTION", [])?;
199    for (table_name, table_schema) in &schema.tables {
200        if !filter.matches(table_name) {
201            pb.inc(1);
202            continue;
203        }
204        pb.set_message(table_name.clone());
205        let csv_path = csvdb_dir.join(format!("{}.csv", table_name));
206        if csv_path.exists() {
207            let table = read_table_csv(&csv_path, table_schema)?;
208            table.write_to_sqlite(&conn)?;
209        }
210        pb.inc(1);
211    }
212    conn.execute("COMMIT", [])?;
213    pb.finish_and_clear();
214
215    Ok(db_path.to_path_buf())
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::commands::to_csv::to_csv;
222    use crate::{OrderMode, NullMode, TableFilter};
223    use tempfile::tempdir;
224
225    #[test]
226    fn test_roundtrip() -> Result<()> {
227        let dir = tempdir()?;
228        let db_path = dir.path().join("test.sqlite");
229
230        // Create test database
231        {
232            let conn = Connection::open(&db_path)?;
233            conn.execute(
234                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
235                [],
236            )?;
237            conn.execute("INSERT INTO users VALUES (1, 'Alice')", [])?;
238            conn.execute("INSERT INTO users VALUES (2, 'Bob')", [])?;
239        }
240
241        // Convert to CSV
242        let csvdb = to_csv(&db_path, OrderMode::Pk, NullMode::Marker, None, true, &TableFilter::new(vec![], vec![]))?;
243
244        // Remove original database
245        fs::remove_file(&db_path)?;
246
247        // Rebuild from CSV
248        let rebuilt_path = to_sqlite(&csvdb, true, &TableFilter::new(vec![], vec![]))?;
249
250        // Verify data
251        let conn = Connection::open(&rebuilt_path)?;
252        let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |r| r.get(0))?;
253        assert_eq!(count, 2);
254
255        let name: String = conn.query_row(
256            "SELECT name FROM users WHERE id = 1",
257            [],
258            |r| r.get(0),
259        )?;
260        assert_eq!(name, "Alice");
261
262        Ok(())
263    }
264
265    #[test]
266    fn test_output_path_csvdb_suffix() -> Result<()> {
267        let dir = tempdir()?;
268        let csvdb_dir = dir.path().join("foo.csvdb");
269        fs::create_dir(&csvdb_dir)?;
270        fs::write(
271            csvdb_dir.join("schema.sql"),
272            "CREATE TABLE \"t\" (\n    \"id\" INTEGER PRIMARY KEY\n);\n",
273        )?;
274        fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
275
276        let db_path = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
277        assert!(db_path.file_name().unwrap().to_str().unwrap().ends_with("foo.sqlite"));
278        Ok(())
279    }
280
281    #[test]
282    fn test_output_path_no_suffix() -> Result<()> {
283        let dir = tempdir()?;
284        let csvdb_dir = dir.path().join("bar");
285        fs::create_dir(&csvdb_dir)?;
286        fs::write(
287            csvdb_dir.join("schema.sql"),
288            "CREATE TABLE \"t\" (\n    \"id\" INTEGER PRIMARY KEY\n);\n",
289        )?;
290        fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
291
292        let db_path = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
293        // Input "bar" (no .csvdb suffix) -> output "bar.sqlite"
294        assert!(db_path.file_name().unwrap().to_str().unwrap().ends_with("bar.sqlite"));
295        Ok(())
296    }
297
298    #[test]
299    fn test_force_overwrites() -> Result<()> {
300        let dir = tempdir()?;
301        let csvdb_dir = dir.path().join("f.csvdb");
302        fs::create_dir(&csvdb_dir)?;
303        fs::write(
304            csvdb_dir.join("schema.sql"),
305            "CREATE TABLE \"t\" (\n    \"id\" INTEGER PRIMARY KEY\n);\n",
306        )?;
307        fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
308
309        // First create
310        let db_path = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
311        assert!(db_path.exists());
312
313        // Force overwrite should succeed
314        let db_path2 = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
315        assert!(db_path2.exists());
316        Ok(())
317    }
318
319    #[test]
320    fn test_no_force_rejects_existing() -> Result<()> {
321        let dir = tempdir()?;
322        let csvdb_dir = dir.path().join("nf.csvdb");
323        fs::create_dir(&csvdb_dir)?;
324        fs::write(
325            csvdb_dir.join("schema.sql"),
326            "CREATE TABLE \"t\" (\n    \"id\" INTEGER PRIMARY KEY\n);\n",
327        )?;
328        fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
329
330        // First create with force
331        to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
332
333        // Second create without force should fail
334        let result = to_sqlite(&csvdb_dir, false, &TableFilter::new(vec![], vec![]));
335        assert!(result.is_err());
336        let err_msg = result.unwrap_err().to_string();
337        assert!(err_msg.contains("--force"));
338        Ok(())
339    }
340}