mod batch;
mod cache;
mod loader;
mod output;
mod types;
#[allow(unused_imports)]
pub use batch::{flush_batch, BatchManager, InsertBatch, MAX_ROWS_PER_BATCH};
pub use cache::CacheManager;
pub use loader::DumpLoader;
pub use output::{OutputFormat, QueryResultFormatter};
#[allow(unused_imports)] pub use types::TypeConverter;
use crate::parser::SqlDialect;
use anyhow::{Context, Result};
use duckdb::Connection;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Default)]
pub struct QueryConfig {
pub dialect: Option<SqlDialect>,
pub disk_mode: bool,
pub cache_enabled: bool,
pub tables: Option<Vec<String>>,
pub memory_limit: Option<String>,
pub progress: bool,
}
#[derive(Debug, Default, Clone)]
pub struct ImportStats {
pub tables_created: usize,
pub insert_statements: usize,
pub rows_inserted: u64,
pub statements_skipped: usize,
pub warnings: Vec<String>,
pub duration_secs: f64,
}
impl std::fmt::Display for ImportStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} tables, {} rows imported in {:.2}s",
self.tables_created, self.rows_inserted, self.duration_secs
)
}
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub columns: Vec<String>,
pub column_types: Vec<String>,
pub rows: Vec<Vec<String>>,
pub execution_time_secs: f64,
}
impl QueryResult {
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
pub fn column_count(&self) -> usize {
self.columns.len()
}
}
pub struct QueryEngine {
conn: Connection,
config: QueryConfig,
import_stats: Option<ImportStats>,
temp_db_path: Option<PathBuf>,
}
impl QueryEngine {
pub fn new(config: &QueryConfig) -> Result<Self> {
let (conn, temp_db_path) = if config.disk_mode {
let temp_dir = std::env::temp_dir();
let temp_path = temp_dir.join(format!("sql-splitter-{}.duckdb", std::process::id()));
let conn = Connection::open(&temp_path)
.context("Failed to create disk-based DuckDB database")?;
(conn, Some(temp_path))
} else {
let conn = Connection::open_in_memory()
.context("Failed to create in-memory DuckDB database")?;
(conn, None)
};
if let Some(ref limit) = config.memory_limit {
conn.execute(&format!("SET memory_limit = '{}'", limit), [])
.context("Failed to set memory limit")?;
}
Ok(Self {
conn,
config: config.clone(),
import_stats: None,
temp_db_path,
})
}
pub fn from_cache(cache_path: &Path, config: &QueryConfig) -> Result<Self> {
let conn = Connection::open(cache_path).context("Failed to open cached DuckDB database")?;
if let Some(ref limit) = config.memory_limit {
conn.execute(&format!("SET memory_limit = '{}'", limit), [])
.context("Failed to set memory limit")?;
}
Ok(Self {
conn,
config: config.clone(),
import_stats: None,
temp_db_path: None,
})
}
pub fn import_dump(&mut self, dump_path: &Path) -> Result<&ImportStats> {
let loader = DumpLoader::new(&self.conn, &self.config);
let stats = loader.load(dump_path)?;
self.import_stats = Some(stats);
Ok(self
.import_stats
.as_ref()
.expect("import_stats was just set"))
}
pub fn query(&self, sql: &str) -> Result<QueryResult> {
let start = std::time::Instant::now();
let mut stmt = self
.conn
.prepare(sql)
.with_context(|| format!("Failed to prepare query: {}", sql))?;
let mut rows_result = stmt
.query([])
.with_context(|| format!("Failed to execute query: {}", sql))?;
let mut rows: Vec<Vec<String>> = Vec::new();
let mut column_count = 0;
while let Some(row) = rows_result.next()? {
if column_count == 0 {
column_count = row.as_ref().column_count();
}
let mut values = Vec::with_capacity(column_count);
for i in 0..column_count {
let value: String = match row.get_ref(i) {
Ok(duckdb::types::ValueRef::Null) => "NULL".to_string(),
Ok(duckdb::types::ValueRef::Boolean(b)) => b.to_string(),
Ok(duckdb::types::ValueRef::TinyInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::SmallInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::Int(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::BigInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::HugeInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::UTinyInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::USmallInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::UInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::UBigInt(n)) => n.to_string(),
Ok(duckdb::types::ValueRef::Float(f)) => f.to_string(),
Ok(duckdb::types::ValueRef::Double(f)) => f.to_string(),
Ok(duckdb::types::ValueRef::Text(s)) => String::from_utf8_lossy(s).to_string(),
Ok(duckdb::types::ValueRef::Blob(b)) => {
format!("<blob {} bytes>", b.len())
}
Ok(duckdb::types::ValueRef::Decimal(d)) => d.to_string(),
Ok(duckdb::types::ValueRef::Timestamp(_, ts)) => {
let secs = ts / 1_000_000;
let nanos = ((ts % 1_000_000) * 1000) as u32;
if let Some(dt) = chrono::DateTime::from_timestamp(secs, nanos) {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
ts.to_string()
}
}
Ok(duckdb::types::ValueRef::Date32(days)) => {
if let Some(date) = chrono::NaiveDate::from_num_days_from_ce_opt(
719163 + days, ) {
date.format("%Y-%m-%d").to_string()
} else {
days.to_string()
}
}
Ok(duckdb::types::ValueRef::Time64(_, micros)) => {
let secs = (micros / 1_000_000) as u32;
let nanos = ((micros % 1_000_000) * 1000) as u32;
if let Some(time) =
chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos)
{
time.format("%H:%M:%S").to_string()
} else {
micros.to_string()
}
}
Ok(other) => format!("{:?}", other),
Err(_) => "ERROR".to_string(),
};
values.push(value);
}
rows.push(values);
}
drop(rows_result);
let column_count = stmt.column_count();
let columns: Vec<String> = (0..column_count)
.map(|i| {
stmt.column_name(i)
.map(|s| s.to_string())
.unwrap_or_else(|_| format!("col{}", i))
})
.collect();
let column_types: Vec<String> = (0..column_count)
.map(|i| format!("{:?}", stmt.column_type(i)))
.collect();
Ok(QueryResult {
columns,
column_types,
rows,
execution_time_secs: start.elapsed().as_secs_f64(),
})
}
pub fn execute(&self, sql: &str) -> Result<usize> {
self.conn
.execute(sql, [])
.with_context(|| format!("Failed to execute: {}", sql))
}
pub fn list_tables(&self) -> Result<Vec<String>> {
let result = self.query("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main' ORDER BY table_name")?;
Ok(result.rows.into_iter().map(|r| r[0].clone()).collect())
}
pub fn describe_table(&self, table: &str) -> Result<QueryResult> {
self.query(&format!("DESCRIBE \"{}\"", table))
}
pub fn import_stats(&self) -> Option<&ImportStats> {
self.import_stats.as_ref()
}
pub fn connection(&self) -> &Connection {
&self.conn
}
}
impl Drop for QueryEngine {
fn drop(&mut self) {
if let Some(ref path) = self.temp_db_path {
let _ = std::fs::remove_file(path);
let wal_path = path.with_extension("duckdb.wal");
let _ = std::fs::remove_file(wal_path);
}
}
}
pub const DISK_MODE_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
pub fn should_use_disk_mode(file_size: u64) -> bool {
file_size > DISK_MODE_THRESHOLD
}