1use anyhow::{bail, Context, Result};
2use indicatif::{ProgressBar, ProgressStyle};
3use rusqlite::Connection;
4use std::fs;
5use std::io::IsTerminal;
6use std::path::{Path, PathBuf};
7use std::process::Command;
8
9use crate::core::csv::read_table_csv;
10use crate::core::{InputFormat, Schema};
11use crate::{NullMode, OrderMode, TableFilter};
12
13pub fn to_sqlite(input_path: &Path, force: bool, filter: &TableFilter) -> Result<PathBuf> {
15 let input_format = InputFormat::from_path(input_path)?;
16
17 let (csvdb_dir, _temp_dir) = match input_format {
19 InputFormat::Csvdb => (input_path.to_path_buf(), None),
20 _ => {
21 let temp_dir = tempfile::tempdir()?;
22 let temp_csvdb = temp_dir.path().join("temp.csvdb");
23 crate::commands::to_csv::to_csv(
24 input_path,
25 OrderMode::Pk,
26 NullMode::Marker,
27 Some(&temp_csvdb),
28 true,
29 filter,
30 )?;
31 (temp_csvdb, Some(temp_dir))
32 }
33 };
34
35 let schema_path = csvdb_dir.join("schema.sql");
36 let schema = Schema::from_schema_sql(&schema_path)?;
37
38 let stem = input_path
40 .file_stem()
41 .and_then(|s| s.to_str())
42 .unwrap_or("database");
43 let stem = stem
44 .strip_suffix(".csvdb")
45 .or_else(|| stem.strip_suffix(".parquetdb"))
46 .unwrap_or(stem);
47
48 let db_name = format!("{}.sqlite", stem);
49 let db_path = input_path
50 .parent()
51 .unwrap_or(Path::new("."))
52 .join(db_name);
53
54 if db_path.exists() {
56 if !force {
57 bail!(
58 "Output file already exists: {}\nUse --force to overwrite.",
59 db_path.display()
60 );
61 }
62 fs::remove_file(&db_path)?;
63 }
64
65 if sqlite3_available() {
67 to_sqlite_via_cli(&csvdb_dir, &db_path, &schema_path, &schema, filter)
68 } else {
69 to_sqlite_via_rusqlite(&csvdb_dir, &db_path, &schema_path, &schema, filter)
70 }
71}
72
73fn sqlite3_available() -> bool {
75 Command::new("sqlite3")
76 .arg("--version")
77 .output()
78 .map(|o| o.status.success())
79 .unwrap_or(false)
80}
81
82fn to_sqlite_via_cli(
84 csvdb_dir: &Path,
85 db_path: &Path,
86 schema_path: &Path,
87 schema: &Schema,
88 filter: &TableFilter,
89) -> Result<PathBuf> {
90 use std::io::Write;
91
92 let abs_db_path = if db_path.is_absolute() {
93 db_path.to_path_buf()
94 } else {
95 std::env::current_dir()?.join(db_path)
96 };
97
98 let mut commands = String::new();
100
101 let abs_schema_path = schema_path.canonicalize()
103 .with_context(|| format!("Failed to get absolute path: {}", schema_path.display()))?;
104 commands.push_str(&format!(".read '{}'\n", abs_schema_path.display()));
105
106 commands.push_str(".mode csv\n");
108 for table_name in schema.tables.keys() {
109 if !filter.matches(table_name) {
110 continue;
111 }
112 let csv_path = csvdb_dir.join(format!("{}.csv", table_name));
113 if csv_path.exists() {
114 let abs_csv_path = csv_path.canonicalize()
115 .with_context(|| format!("Failed to get absolute path: {}", csv_path.display()))?;
116 commands.push_str(&format!(
118 ".import --skip 1 '{}' {}\n",
119 abs_csv_path.display(),
120 table_name
121 ));
122 }
123 }
124
125 for (table_name, table_schema) in &schema.tables {
128 if !filter.matches(table_name) {
129 continue;
130 }
131 for col in &table_schema.columns {
132 commands.push_str(&format!(
133 "UPDATE \"{}\" SET \"{}\" = NULL WHERE \"{}\" = '\\N';\n",
134 table_name, col.name, col.name
135 ));
136 }
137 }
138
139 let mut child = Command::new("sqlite3")
141 .arg(&abs_db_path)
142 .stdin(std::process::Stdio::piped())
143 .stdout(std::process::Stdio::piped())
144 .stderr(std::process::Stdio::piped())
145 .spawn()
146 .context("Failed to start sqlite3")?;
147
148 if let Some(mut stdin) = child.stdin.take() {
150 stdin.write_all(commands.as_bytes())
151 .context("Failed to write to sqlite3 stdin")?;
152 }
153
154 let output = child.wait_with_output()
155 .context("Failed to run sqlite3")?;
156
157 if !output.status.success() {
158 let stderr = String::from_utf8_lossy(&output.stderr);
159 bail!("sqlite3 import failed: {}", stderr);
160 }
161
162 Ok(db_path.to_path_buf())
163}
164
165fn to_sqlite_via_rusqlite(
167 csvdb_dir: &Path,
168 db_path: &Path,
169 schema_path: &Path,
170 schema: &Schema,
171 filter: &TableFilter,
172) -> Result<PathBuf> {
173 let conn = Connection::open(db_path)
174 .with_context(|| format!("Failed to create database: {}", db_path.display()))?;
175
176 let schema_sql = fs::read_to_string(schema_path)?;
178 for stmt in schema_sql.split(';') {
179 let stmt = stmt.trim();
180 if !stmt.is_empty() {
181 conn.execute(stmt, [])
182 .with_context(|| format!("Failed to execute: {}", stmt))?;
183 }
184 }
185
186 let pb = if std::io::stderr().is_terminal() {
188 let pb = ProgressBar::new(schema.tables.len() as u64);
189 pb.set_style(
190 ProgressStyle::default_bar()
191 .template("[{bar:40}] {pos}/{len} {msg}")
192 .unwrap(),
193 );
194 pb
195 } else {
196 ProgressBar::hidden()
197 };
198 conn.execute("BEGIN TRANSACTION", [])?;
199 for (table_name, table_schema) in &schema.tables {
200 if !filter.matches(table_name) {
201 pb.inc(1);
202 continue;
203 }
204 pb.set_message(table_name.clone());
205 let csv_path = csvdb_dir.join(format!("{}.csv", table_name));
206 if csv_path.exists() {
207 let table = read_table_csv(&csv_path, table_schema)?;
208 table.write_to_sqlite(&conn)?;
209 }
210 pb.inc(1);
211 }
212 conn.execute("COMMIT", [])?;
213 pb.finish_and_clear();
214
215 Ok(db_path.to_path_buf())
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use crate::commands::to_csv::to_csv;
222 use crate::{OrderMode, NullMode, TableFilter};
223 use tempfile::tempdir;
224
225 #[test]
226 fn test_roundtrip() -> Result<()> {
227 let dir = tempdir()?;
228 let db_path = dir.path().join("test.sqlite");
229
230 {
232 let conn = Connection::open(&db_path)?;
233 conn.execute(
234 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
235 [],
236 )?;
237 conn.execute("INSERT INTO users VALUES (1, 'Alice')", [])?;
238 conn.execute("INSERT INTO users VALUES (2, 'Bob')", [])?;
239 }
240
241 let csvdb = to_csv(&db_path, OrderMode::Pk, NullMode::Marker, None, true, &TableFilter::new(vec![], vec![]))?;
243
244 fs::remove_file(&db_path)?;
246
247 let rebuilt_path = to_sqlite(&csvdb, true, &TableFilter::new(vec![], vec![]))?;
249
250 let conn = Connection::open(&rebuilt_path)?;
252 let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |r| r.get(0))?;
253 assert_eq!(count, 2);
254
255 let name: String = conn.query_row(
256 "SELECT name FROM users WHERE id = 1",
257 [],
258 |r| r.get(0),
259 )?;
260 assert_eq!(name, "Alice");
261
262 Ok(())
263 }
264
265 #[test]
266 fn test_output_path_csvdb_suffix() -> Result<()> {
267 let dir = tempdir()?;
268 let csvdb_dir = dir.path().join("foo.csvdb");
269 fs::create_dir(&csvdb_dir)?;
270 fs::write(
271 csvdb_dir.join("schema.sql"),
272 "CREATE TABLE \"t\" (\n \"id\" INTEGER PRIMARY KEY\n);\n",
273 )?;
274 fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
275
276 let db_path = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
277 assert!(db_path.file_name().unwrap().to_str().unwrap().ends_with("foo.sqlite"));
278 Ok(())
279 }
280
281 #[test]
282 fn test_output_path_no_suffix() -> Result<()> {
283 let dir = tempdir()?;
284 let csvdb_dir = dir.path().join("bar");
285 fs::create_dir(&csvdb_dir)?;
286 fs::write(
287 csvdb_dir.join("schema.sql"),
288 "CREATE TABLE \"t\" (\n \"id\" INTEGER PRIMARY KEY\n);\n",
289 )?;
290 fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
291
292 let db_path = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
293 assert!(db_path.file_name().unwrap().to_str().unwrap().ends_with("bar.sqlite"));
295 Ok(())
296 }
297
298 #[test]
299 fn test_force_overwrites() -> Result<()> {
300 let dir = tempdir()?;
301 let csvdb_dir = dir.path().join("f.csvdb");
302 fs::create_dir(&csvdb_dir)?;
303 fs::write(
304 csvdb_dir.join("schema.sql"),
305 "CREATE TABLE \"t\" (\n \"id\" INTEGER PRIMARY KEY\n);\n",
306 )?;
307 fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
308
309 let db_path = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
311 assert!(db_path.exists());
312
313 let db_path2 = to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
315 assert!(db_path2.exists());
316 Ok(())
317 }
318
319 #[test]
320 fn test_no_force_rejects_existing() -> Result<()> {
321 let dir = tempdir()?;
322 let csvdb_dir = dir.path().join("nf.csvdb");
323 fs::create_dir(&csvdb_dir)?;
324 fs::write(
325 csvdb_dir.join("schema.sql"),
326 "CREATE TABLE \"t\" (\n \"id\" INTEGER PRIMARY KEY\n);\n",
327 )?;
328 fs::write(csvdb_dir.join("t.csv"), "id\n1\n")?;
329
330 to_sqlite(&csvdb_dir, true, &TableFilter::new(vec![], vec![]))?;
332
333 let result = to_sqlite(&csvdb_dir, false, &TableFilter::new(vec![], vec![]));
335 assert!(result.is_err());
336 let err_msg = result.unwrap_err().to_string();
337 assert!(err_msg.contains("--force"));
338 Ok(())
339 }
340}