Skip to main content

sql_splitter/duckdb/
mod.rs

1//! DuckDB query engine for SQL dump analytics.
2//!
3//! This module provides the ability to load SQL dumps into an embedded DuckDB
4//! database and execute analytical queries on them.
5//!
6//! # Features
7//!
8//! - **Zero dependencies**: DuckDB is bundled and compiled into sql-splitter
9//! - **Multi-dialect support**: MySQL, PostgreSQL, and SQLite dumps
10//! - **Memory management**: Auto-switches to disk mode for large dumps
11//! - **Caching**: Optional persistent cache for repeated queries
12//!
13//! # Example
14//!
15//! ```ignore
16//! use sql_splitter::duckdb::{QueryEngine, QueryConfig, QueryResultFormatter, OutputFormat};
17//! use std::path::Path;
18//!
19//! let config = QueryConfig::default();
20//! let mut engine = QueryEngine::new(&config).unwrap();
21//! engine.import_dump(Path::new("dump.sql")).unwrap();
22//!
23//! let result = engine.query("SELECT COUNT(*) FROM users").unwrap();
24//! println!("{}", QueryResultFormatter::format(&result, OutputFormat::Table));
25//! ```
26
27mod batch;
28mod cache;
29mod loader;
30mod output;
31mod types;
32
33// BatchManager is used internally by DumpLoader
34#[allow(unused_imports)]
35pub use batch::{flush_batch, BatchManager, InsertBatch, MAX_ROWS_PER_BATCH};
36pub use cache::CacheManager;
37pub use loader::DumpLoader;
38pub use output::{OutputFormat, QueryResultFormatter};
39#[allow(unused_imports)] // Used in tests
40pub use types::TypeConverter;
41
42use crate::parser::SqlDialect;
43use anyhow::{Context, Result};
44use duckdb::Connection;
45use std::path::{Path, PathBuf};
46
47/// Configuration for the query engine
48#[derive(Debug, Clone, Default)]
49pub struct QueryConfig {
50    /// Source SQL dialect (auto-detected if None)
51    pub dialect: Option<SqlDialect>,
52    /// Use disk-based storage instead of in-memory
53    pub disk_mode: bool,
54    /// Enable persistent caching
55    pub cache_enabled: bool,
56    /// Only import specific tables
57    pub tables: Option<Vec<String>>,
58    /// Memory limit for DuckDB (e.g., "4GB")
59    pub memory_limit: Option<String>,
60    /// Show progress during import
61    pub progress: bool,
62}
63
64/// Statistics from dump import
65#[derive(Debug, Default, Clone)]
66pub struct ImportStats {
67    /// Number of tables created
68    pub tables_created: usize,
69    /// Number of INSERT statements processed
70    pub insert_statements: usize,
71    /// Total rows inserted
72    pub rows_inserted: u64,
73    /// Statements skipped (unsupported)
74    pub statements_skipped: usize,
75    /// Warnings generated
76    pub warnings: Vec<String>,
77    /// Import duration in seconds
78    pub duration_secs: f64,
79}
80
81impl std::fmt::Display for ImportStats {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(
84            f,
85            "{} tables, {} rows imported in {:.2}s",
86            self.tables_created, self.rows_inserted, self.duration_secs
87        )
88    }
89}
90
91/// Result of a query execution
92#[derive(Debug, Clone)]
93pub struct QueryResult {
94    /// Column names
95    pub columns: Vec<String>,
96    /// Column types (as strings)
97    pub column_types: Vec<String>,
98    /// Rows of data (each row is a vector of string values)
99    pub rows: Vec<Vec<String>>,
100    /// Query execution time in seconds
101    pub execution_time_secs: f64,
102}
103
104impl QueryResult {
105    /// Check if the result is empty
106    pub fn is_empty(&self) -> bool {
107        self.rows.is_empty()
108    }
109
110    /// Get the number of rows
111    pub fn row_count(&self) -> usize {
112        self.rows.len()
113    }
114
115    /// Get the number of columns
116    pub fn column_count(&self) -> usize {
117        self.columns.len()
118    }
119}
120
121/// The main query engine that wraps DuckDB
122pub struct QueryEngine {
123    conn: Connection,
124    config: QueryConfig,
125    import_stats: Option<ImportStats>,
126    temp_db_path: Option<PathBuf>,
127}
128
129impl QueryEngine {
130    /// Create a new query engine with the given configuration
131    pub fn new(config: &QueryConfig) -> Result<Self> {
132        let (conn, temp_db_path) = if config.disk_mode {
133            let temp_dir = std::env::temp_dir();
134            let temp_path = temp_dir.join(format!("sql-splitter-{}.duckdb", std::process::id()));
135            let conn = Connection::open(&temp_path)
136                .context("Failed to create disk-based DuckDB database")?;
137            (conn, Some(temp_path))
138        } else {
139            let conn = Connection::open_in_memory()
140                .context("Failed to create in-memory DuckDB database")?;
141            (conn, None)
142        };
143
144        // Configure memory limit if specified
145        if let Some(ref limit) = config.memory_limit {
146            conn.execute(&format!("SET memory_limit = '{}'", limit), [])
147                .context("Failed to set memory limit")?;
148        }
149
150        Ok(Self {
151            conn,
152            config: config.clone(),
153            import_stats: None,
154            temp_db_path,
155        })
156    }
157
158    /// Create a query engine from a cached database file
159    pub fn from_cache(cache_path: &Path, config: &QueryConfig) -> Result<Self> {
160        let conn = Connection::open(cache_path).context("Failed to open cached DuckDB database")?;
161
162        if let Some(ref limit) = config.memory_limit {
163            conn.execute(&format!("SET memory_limit = '{}'", limit), [])
164                .context("Failed to set memory limit")?;
165        }
166
167        Ok(Self {
168            conn,
169            config: config.clone(),
170            import_stats: None,
171            temp_db_path: None,
172        })
173    }
174
175    /// Import a SQL dump file into the DuckDB database
176    pub fn import_dump(&mut self, dump_path: &Path) -> Result<&ImportStats> {
177        let loader = DumpLoader::new(&self.conn, &self.config);
178        let stats = loader.load(dump_path)?;
179        self.import_stats = Some(stats);
180        // Safe: we just set import_stats to Some above
181        Ok(self
182            .import_stats
183            .as_ref()
184            .expect("import_stats was just set"))
185    }
186
187    /// Execute a query and return the results
188    pub fn query(&self, sql: &str) -> Result<QueryResult> {
189        let start = std::time::Instant::now();
190
191        let mut stmt = self
192            .conn
193            .prepare(sql)
194            .with_context(|| format!("Failed to prepare query: {}", sql))?;
195
196        // Execute the query and collect rows
197        let mut rows_result = stmt
198            .query([])
199            .with_context(|| format!("Failed to execute query: {}", sql))?;
200
201        // Collect all rows first
202        let mut rows: Vec<Vec<String>> = Vec::new();
203        let mut column_count = 0;
204
205        while let Some(row) = rows_result.next()? {
206            // Get column count from first row
207            if column_count == 0 {
208                column_count = row.as_ref().column_count();
209            }
210
211            let mut values = Vec::with_capacity(column_count);
212            for i in 0..column_count {
213                let value: String = match row.get_ref(i) {
214                    Ok(duckdb::types::ValueRef::Null) => "NULL".to_string(),
215                    Ok(duckdb::types::ValueRef::Boolean(b)) => b.to_string(),
216                    Ok(duckdb::types::ValueRef::TinyInt(n)) => n.to_string(),
217                    Ok(duckdb::types::ValueRef::SmallInt(n)) => n.to_string(),
218                    Ok(duckdb::types::ValueRef::Int(n)) => n.to_string(),
219                    Ok(duckdb::types::ValueRef::BigInt(n)) => n.to_string(),
220                    Ok(duckdb::types::ValueRef::HugeInt(n)) => n.to_string(),
221                    Ok(duckdb::types::ValueRef::UTinyInt(n)) => n.to_string(),
222                    Ok(duckdb::types::ValueRef::USmallInt(n)) => n.to_string(),
223                    Ok(duckdb::types::ValueRef::UInt(n)) => n.to_string(),
224                    Ok(duckdb::types::ValueRef::UBigInt(n)) => n.to_string(),
225                    Ok(duckdb::types::ValueRef::Float(f)) => f.to_string(),
226                    Ok(duckdb::types::ValueRef::Double(f)) => f.to_string(),
227                    Ok(duckdb::types::ValueRef::Text(s)) => String::from_utf8_lossy(s).to_string(),
228                    Ok(duckdb::types::ValueRef::Blob(b)) => {
229                        format!("<blob {} bytes>", b.len())
230                    }
231                    Ok(duckdb::types::ValueRef::Decimal(d)) => d.to_string(),
232                    Ok(duckdb::types::ValueRef::Timestamp(_, ts)) => {
233                        // Convert timestamp to readable format
234                        // DuckDB timestamps are microseconds since epoch
235                        let secs = ts / 1_000_000;
236                        let nanos = ((ts % 1_000_000) * 1000) as u32;
237                        if let Some(dt) = chrono::DateTime::from_timestamp(secs, nanos) {
238                            dt.format("%Y-%m-%d %H:%M:%S").to_string()
239                        } else {
240                            ts.to_string()
241                        }
242                    }
243                    Ok(duckdb::types::ValueRef::Date32(days)) => {
244                        // Days since epoch (1970-01-01)
245                        if let Some(date) = chrono::NaiveDate::from_num_days_from_ce_opt(
246                            719163 + days, // 719163 = days from 0001-01-01 to 1970-01-01
247                        ) {
248                            date.format("%Y-%m-%d").to_string()
249                        } else {
250                            days.to_string()
251                        }
252                    }
253                    Ok(duckdb::types::ValueRef::Time64(_, micros)) => {
254                        let secs = (micros / 1_000_000) as u32;
255                        let nanos = ((micros % 1_000_000) * 1000) as u32;
256                        if let Some(time) =
257                            chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos)
258                        {
259                            time.format("%H:%M:%S").to_string()
260                        } else {
261                            micros.to_string()
262                        }
263                    }
264                    Ok(other) => format!("{:?}", other),
265                    Err(_) => "ERROR".to_string(),
266                };
267                values.push(value);
268            }
269            rows.push(values);
270        }
271
272        // Drop the rows iterator to release the mutable borrow
273        drop(rows_result);
274
275        // Now get column info from the statement
276        let column_count = stmt.column_count();
277        let columns: Vec<String> = (0..column_count)
278            .map(|i| {
279                stmt.column_name(i)
280                    .map(|s| s.to_string())
281                    .unwrap_or_else(|_| format!("col{}", i))
282            })
283            .collect();
284
285        // Get column types
286        let column_types: Vec<String> = (0..column_count)
287            .map(|i| format!("{:?}", stmt.column_type(i)))
288            .collect();
289
290        Ok(QueryResult {
291            columns,
292            column_types,
293            rows,
294            execution_time_secs: start.elapsed().as_secs_f64(),
295        })
296    }
297
298    /// Execute a statement that doesn't return results (e.g., CREATE, INSERT)
299    pub fn execute(&self, sql: &str) -> Result<usize> {
300        self.conn
301            .execute(sql, [])
302            .with_context(|| format!("Failed to execute: {}", sql))
303    }
304
305    /// Get list of tables in the database
306    pub fn list_tables(&self) -> Result<Vec<String>> {
307        let result = self.query("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main' ORDER BY table_name")?;
308        Ok(result.rows.into_iter().map(|r| r[0].clone()).collect())
309    }
310
311    /// Get schema for a specific table
312    pub fn describe_table(&self, table: &str) -> Result<QueryResult> {
313        self.query(&format!("DESCRIBE \"{}\"", table))
314    }
315
316    /// Get import statistics (if a dump was imported)
317    pub fn import_stats(&self) -> Option<&ImportStats> {
318        self.import_stats.as_ref()
319    }
320
321    /// Get the underlying DuckDB connection (for advanced use)
322    pub fn connection(&self) -> &Connection {
323        &self.conn
324    }
325
326    /// Save the current database to a file (for caching)
327    pub fn save_to_file(&self, path: &Path) -> Result<()> {
328        self.conn
329            .execute(&format!("EXPORT DATABASE '{}'", path.display()), [])
330            .context("Failed to export database")?;
331        Ok(())
332    }
333}
334
335impl Drop for QueryEngine {
336    fn drop(&mut self) {
337        // Clean up temporary database file if we created one
338        if let Some(ref path) = self.temp_db_path {
339            let _ = std::fs::remove_file(path);
340            // Also try to remove the .wal file if it exists
341            let wal_path = path.with_extension("duckdb.wal");
342            let _ = std::fs::remove_file(wal_path);
343        }
344    }
345}
346
347/// Threshold for automatic disk mode (2 GB)
348pub const DISK_MODE_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
349
350/// Determine if disk mode should be used based on file size
351pub fn should_use_disk_mode(file_size: u64) -> bool {
352    file_size > DISK_MODE_THRESHOLD
353}