1mod batch;
28mod cache;
29mod loader;
30mod output;
31mod types;
32
33#[allow(unused_imports)]
35pub use batch::{flush_batch, BatchManager, InsertBatch, MAX_ROWS_PER_BATCH};
36pub use cache::CacheManager;
37pub use loader::DumpLoader;
38pub use output::{OutputFormat, QueryResultFormatter};
39#[allow(unused_imports)] pub use types::TypeConverter;
41
42use crate::parser::SqlDialect;
43use anyhow::{Context, Result};
44use duckdb::Connection;
45use std::path::{Path, PathBuf};
46
47#[derive(Debug, Clone, Default)]
49pub struct QueryConfig {
50 pub dialect: Option<SqlDialect>,
52 pub disk_mode: bool,
54 pub cache_enabled: bool,
56 pub tables: Option<Vec<String>>,
58 pub memory_limit: Option<String>,
60 pub progress: bool,
62}
63
64#[derive(Debug, Default, Clone)]
66pub struct ImportStats {
67 pub tables_created: usize,
69 pub insert_statements: usize,
71 pub rows_inserted: u64,
73 pub statements_skipped: usize,
75 pub warnings: Vec<String>,
77 pub duration_secs: f64,
79}
80
81impl std::fmt::Display for ImportStats {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "{} tables, {} rows imported in {:.2}s",
86 self.tables_created, self.rows_inserted, self.duration_secs
87 )
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct QueryResult {
94 pub columns: Vec<String>,
96 pub column_types: Vec<String>,
98 pub rows: Vec<Vec<String>>,
100 pub execution_time_secs: f64,
102}
103
104impl QueryResult {
105 pub fn is_empty(&self) -> bool {
107 self.rows.is_empty()
108 }
109
110 pub fn row_count(&self) -> usize {
112 self.rows.len()
113 }
114
115 pub fn column_count(&self) -> usize {
117 self.columns.len()
118 }
119}
120
121pub struct QueryEngine {
123 conn: Connection,
124 config: QueryConfig,
125 import_stats: Option<ImportStats>,
126 temp_db_path: Option<PathBuf>,
127}
128
129impl QueryEngine {
130 pub fn new(config: &QueryConfig) -> Result<Self> {
132 let (conn, temp_db_path) = if config.disk_mode {
133 let temp_dir = std::env::temp_dir();
134 let temp_path = temp_dir.join(format!("sql-splitter-{}.duckdb", std::process::id()));
135 let conn = Connection::open(&temp_path)
136 .context("Failed to create disk-based DuckDB database")?;
137 (conn, Some(temp_path))
138 } else {
139 let conn = Connection::open_in_memory()
140 .context("Failed to create in-memory DuckDB database")?;
141 (conn, None)
142 };
143
144 if let Some(ref limit) = config.memory_limit {
146 conn.execute(&format!("SET memory_limit = '{}'", limit), [])
147 .context("Failed to set memory limit")?;
148 }
149
150 Ok(Self {
151 conn,
152 config: config.clone(),
153 import_stats: None,
154 temp_db_path,
155 })
156 }
157
158 pub fn from_cache(cache_path: &Path, config: &QueryConfig) -> Result<Self> {
160 let conn = Connection::open(cache_path).context("Failed to open cached DuckDB database")?;
161
162 if let Some(ref limit) = config.memory_limit {
163 conn.execute(&format!("SET memory_limit = '{}'", limit), [])
164 .context("Failed to set memory limit")?;
165 }
166
167 Ok(Self {
168 conn,
169 config: config.clone(),
170 import_stats: None,
171 temp_db_path: None,
172 })
173 }
174
175 pub fn import_dump(&mut self, dump_path: &Path) -> Result<&ImportStats> {
177 let loader = DumpLoader::new(&self.conn, &self.config);
178 let stats = loader.load(dump_path)?;
179 self.import_stats = Some(stats);
180 Ok(self
182 .import_stats
183 .as_ref()
184 .expect("import_stats was just set"))
185 }
186
187 pub fn query(&self, sql: &str) -> Result<QueryResult> {
189 let start = std::time::Instant::now();
190
191 let mut stmt = self
192 .conn
193 .prepare(sql)
194 .with_context(|| format!("Failed to prepare query: {}", sql))?;
195
196 let mut rows_result = stmt
198 .query([])
199 .with_context(|| format!("Failed to execute query: {}", sql))?;
200
201 let mut rows: Vec<Vec<String>> = Vec::new();
203 let mut column_count = 0;
204
205 while let Some(row) = rows_result.next()? {
206 if column_count == 0 {
208 column_count = row.as_ref().column_count();
209 }
210
211 let mut values = Vec::with_capacity(column_count);
212 for i in 0..column_count {
213 let value: String = match row.get_ref(i) {
214 Ok(duckdb::types::ValueRef::Null) => "NULL".to_string(),
215 Ok(duckdb::types::ValueRef::Boolean(b)) => b.to_string(),
216 Ok(duckdb::types::ValueRef::TinyInt(n)) => n.to_string(),
217 Ok(duckdb::types::ValueRef::SmallInt(n)) => n.to_string(),
218 Ok(duckdb::types::ValueRef::Int(n)) => n.to_string(),
219 Ok(duckdb::types::ValueRef::BigInt(n)) => n.to_string(),
220 Ok(duckdb::types::ValueRef::HugeInt(n)) => n.to_string(),
221 Ok(duckdb::types::ValueRef::UTinyInt(n)) => n.to_string(),
222 Ok(duckdb::types::ValueRef::USmallInt(n)) => n.to_string(),
223 Ok(duckdb::types::ValueRef::UInt(n)) => n.to_string(),
224 Ok(duckdb::types::ValueRef::UBigInt(n)) => n.to_string(),
225 Ok(duckdb::types::ValueRef::Float(f)) => f.to_string(),
226 Ok(duckdb::types::ValueRef::Double(f)) => f.to_string(),
227 Ok(duckdb::types::ValueRef::Text(s)) => String::from_utf8_lossy(s).to_string(),
228 Ok(duckdb::types::ValueRef::Blob(b)) => {
229 format!("<blob {} bytes>", b.len())
230 }
231 Ok(duckdb::types::ValueRef::Decimal(d)) => d.to_string(),
232 Ok(duckdb::types::ValueRef::Timestamp(_, ts)) => {
233 let secs = ts / 1_000_000;
236 let nanos = ((ts % 1_000_000) * 1000) as u32;
237 if let Some(dt) = chrono::DateTime::from_timestamp(secs, nanos) {
238 dt.format("%Y-%m-%d %H:%M:%S").to_string()
239 } else {
240 ts.to_string()
241 }
242 }
243 Ok(duckdb::types::ValueRef::Date32(days)) => {
244 if let Some(date) = chrono::NaiveDate::from_num_days_from_ce_opt(
246 719163 + days, ) {
248 date.format("%Y-%m-%d").to_string()
249 } else {
250 days.to_string()
251 }
252 }
253 Ok(duckdb::types::ValueRef::Time64(_, micros)) => {
254 let secs = (micros / 1_000_000) as u32;
255 let nanos = ((micros % 1_000_000) * 1000) as u32;
256 if let Some(time) =
257 chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos)
258 {
259 time.format("%H:%M:%S").to_string()
260 } else {
261 micros.to_string()
262 }
263 }
264 Ok(other) => format!("{:?}", other),
265 Err(_) => "ERROR".to_string(),
266 };
267 values.push(value);
268 }
269 rows.push(values);
270 }
271
272 drop(rows_result);
274
275 let column_count = stmt.column_count();
277 let columns: Vec<String> = (0..column_count)
278 .map(|i| {
279 stmt.column_name(i)
280 .map(|s| s.to_string())
281 .unwrap_or_else(|_| format!("col{}", i))
282 })
283 .collect();
284
285 let column_types: Vec<String> = (0..column_count)
287 .map(|i| format!("{:?}", stmt.column_type(i)))
288 .collect();
289
290 Ok(QueryResult {
291 columns,
292 column_types,
293 rows,
294 execution_time_secs: start.elapsed().as_secs_f64(),
295 })
296 }
297
298 pub fn execute(&self, sql: &str) -> Result<usize> {
300 self.conn
301 .execute(sql, [])
302 .with_context(|| format!("Failed to execute: {}", sql))
303 }
304
305 pub fn list_tables(&self) -> Result<Vec<String>> {
307 let result = self.query("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main' ORDER BY table_name")?;
308 Ok(result.rows.into_iter().map(|r| r[0].clone()).collect())
309 }
310
311 pub fn describe_table(&self, table: &str) -> Result<QueryResult> {
313 self.query(&format!("DESCRIBE \"{}\"", table))
314 }
315
316 pub fn import_stats(&self) -> Option<&ImportStats> {
318 self.import_stats.as_ref()
319 }
320
321 pub fn connection(&self) -> &Connection {
323 &self.conn
324 }
325
326 pub fn save_to_file(&self, path: &Path) -> Result<()> {
328 self.conn
329 .execute(&format!("EXPORT DATABASE '{}'", path.display()), [])
330 .context("Failed to export database")?;
331 Ok(())
332 }
333}
334
335impl Drop for QueryEngine {
336 fn drop(&mut self) {
337 if let Some(ref path) = self.temp_db_path {
339 let _ = std::fs::remove_file(path);
340 let wal_path = path.with_extension("duckdb.wal");
342 let _ = std::fs::remove_file(wal_path);
343 }
344 }
345}
346
347pub const DISK_MODE_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
349
350pub fn should_use_disk_mode(file_size: u64) -> bool {
352 file_size > DISK_MODE_THRESHOLD
353}