use super::{ExportStats, Exporter, csv::CsvExporter};
use crate::config;
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use duckdb::Connection;
use log::{debug, info, warn};
use std::fs;
use std::path::{Path, PathBuf};
pub struct DuckdbExporter {
database_url: String,
table_name: String,
overwrite: bool,
append: bool,
conn: Option<Connection>,
stats: ExportStats,
csv_exporter: Option<CsvExporter>,
temp_csv_path: Option<PathBuf>,
}
impl std::fmt::Debug for DuckdbExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DuckdbExporter")
.field("database_url", &self.database_url)
.field("table_name", &self.table_name)
.field("overwrite", &self.overwrite)
.field("append", &self.append)
.field("stats", &self.stats)
.finish_non_exhaustive()
}
}
impl DuckdbExporter {
#[must_use]
pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
Self {
database_url,
table_name,
overwrite,
append,
conn: None,
stats: ExportStats::new(),
csv_exporter: None,
temp_csv_path: None,
}
}
#[must_use]
pub fn from_config(config: &config::DuckdbExporter) -> Self {
Self::new(
config.database_url.clone(),
config.table_name.clone(),
config.overwrite,
config.append,
)
}
fn create_table(&self) -> Result<()> {
let conn = self.conn.as_ref().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let create_table_sql = format!(
r"CREATE TABLE IF NOT EXISTS {} (
ts VARCHAR NOT NULL,
ep INTEGER NOT NULL,
sess_id VARCHAR NOT NULL,
thrd_id VARCHAR NOT NULL,
username VARCHAR NOT NULL,
trx_id VARCHAR NOT NULL,
statement VARCHAR NOT NULL,
appname VARCHAR,
client_ip VARCHAR,
sql TEXT NOT NULL,
exec_time_ms FLOAT,
row_count INTEGER,
exec_id BIGINT
)",
self.table_name
);
conn.execute(&create_table_sql, []).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create table: {e}"),
})
})?;
info!("DuckDB table created or already exists");
Ok(())
}
}
impl Exporter for DuckdbExporter {
fn initialize(&mut self) -> Result<()> {
info!("Initializing DuckDB exporter: {}", self.database_url);
let path = Path::new(&self.database_url);
if let Some(parent) = path.parent().filter(|p| !p.exists()) {
fs::create_dir_all(parent).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create directory: {e}"),
})
})?;
}
let conn = Connection::open(&self.database_url).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to open database: {e}"),
})
})?;
self.conn = Some(conn);
if self.overwrite {
info!("Overwrite mode: dropping existing table if it exists");
if let Some(ref conn) = self.conn {
conn.execute(&format!("DROP TABLE IF EXISTS {}", self.table_name), [])
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to drop table: {e}"),
})
})?;
}
} else if !self.append {
info!("Truncate mode: clearing existing table data");
if let Some(ref conn) = self.conn {
conn.execute(&format!("DELETE FROM {}", self.table_name), [])
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to truncate table: {e}"),
})
})?;
}
} else {
info!("Append mode: keeping existing data");
}
self.create_table()?;
let temp_dir = std::env::temp_dir();
let temp_csv_path = temp_dir.join(format!("duckdb_import_{}.csv", std::process::id()));
let mut csv_exporter = CsvExporter::new(&temp_csv_path);
csv_exporter.initialize()?;
self.csv_exporter = Some(csv_exporter);
self.temp_csv_path = Some(temp_csv_path);
info!("DuckDB exporter initialized: {}", self.database_url);
Ok(())
}
fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
if let Some(csv_exporter) = &mut self.csv_exporter {
csv_exporter.export(sqllog)?;
self.stats.record_success();
}
Ok(())
}
fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
debug!("Exporting {} records to DuckDB in batch", sqllogs.len());
if let Some(csv_exporter) = &mut self.csv_exporter {
csv_exporter.export_batch(sqllogs)?;
self.stats.exported += sqllogs.len();
}
Ok(())
}
fn finalize(&mut self) -> Result<()> {
if let Some(mut csv_exporter) = self.csv_exporter.take() {
<CsvExporter as Exporter>::finalize(&mut csv_exporter)?;
}
let csv_path = self.temp_csv_path.as_ref().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "No temporary CSV file".to_string(),
})
})?;
info!(
"Importing {} records from CSV to DuckDB...",
self.stats.exported
);
self.conn = None;
let csv_path_str = csv_path.to_string_lossy().replace('\\', "/");
let sql = format!(
"PRAGMA threads=16; PRAGMA memory_limit='8GB'; SET preserve_insertion_order=false; COPY {} FROM '{}' (HEADER true, DELIMITER ',', PARALLEL true)",
self.table_name,
csv_path_str.replace('\'', "''")
);
let output = std::process::Command::new("duckdb")
.arg(&self.database_url)
.arg("-c")
.arg(&sql)
.output()
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to execute duckdb CLI: {e}"),
})
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(Error::Export(ExportError::DatabaseError {
reason: format!("DuckDB import failed: {stderr}"),
}));
}
info!(
"Successfully imported {} records to DuckDB",
self.stats.exported
);
if csv_path.exists() {
let _ = fs::remove_file(csv_path);
}
self.temp_csv_path = None;
info!(
"DuckDB export finished: {} (success: {}, failed: {})",
self.database_url, self.stats.exported, self.stats.failed
);
Ok(())
}
fn name(&self) -> &'static str {
"DuckDB"
}
fn stats_snapshot(&self) -> Option<ExportStats> {
Some(self.stats.clone())
}
}
impl Drop for DuckdbExporter {
fn drop(&mut self) {
if self.csv_exporter.is_some()
&& let Err(e) = self.finalize()
{
warn!("DuckDB exporter finalization on Drop failed: {e}");
}
}
}