use super::{ExportStats, Exporter};
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use log::info;
use rayon::prelude::*;
use rusqlite::{Connection, params};
use std::path::Path;
pub struct SqliteExporter {
database_url: String,
table_name: String,
overwrite: bool,
append: bool,
conn: Option<Connection>,
stats: ExportStats,
}
impl std::fmt::Debug for SqliteExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteExporter")
.field("database_url", &self.database_url)
.field("table_name", &self.table_name)
.field("overwrite", &self.overwrite)
.field("append", &self.append)
.field("stats", &self.stats)
.finish_non_exhaustive()
}
}
impl SqliteExporter {
#[must_use]
pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
Self {
database_url,
table_name,
overwrite,
append,
conn: None,
stats: ExportStats::new(),
}
}
#[must_use]
pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
Self::new(
config.database_url.clone(),
config.table_name.clone(),
config.overwrite,
config.append,
)
}
fn create_table(&self) -> Result<()> {
let conn = self.conn.as_ref().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let sql = format!(
r"
CREATE TABLE IF NOT EXISTS {} (
ts TEXT NOT NULL,
ep INTEGER NOT NULL,
sess_id TEXT NOT NULL,
thrd_id TEXT NOT NULL,
username TEXT NOT NULL,
trx_id TEXT NOT NULL,
statement TEXT,
appname TEXT,
client_ip TEXT,
tag TEXT,
sql TEXT NOT NULL,
exec_time_ms REAL,
row_count INTEGER,
exec_id INTEGER
)
",
self.table_name
);
conn.execute(&sql, []).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create table: {e}"),
})
})?;
info!("SQLite table created or already exists");
Ok(())
}
}
impl Exporter for SqliteExporter {
fn initialize(&mut self) -> Result<()> {
info!("Initializing SQLite exporter: {}", self.database_url);
let path = Path::new(&self.database_url);
if let Some(parent) = path.parent().filter(|p| !p.exists()) {
std::fs::create_dir_all(parent).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create directory: {e}"),
})
})?;
}
let conn = Connection::open(&self.database_url).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to open database: {e}"),
})
})?;
conn.execute_batch(
"PRAGMA journal_mode = OFF;
PRAGMA synchronous = OFF;
PRAGMA cache_size = 1000000;
PRAGMA locking_mode = EXCLUSIVE;
PRAGMA temp_store = MEMORY;
PRAGMA mmap_size = 30000000000;
PRAGMA page_size = 65536;
PRAGMA threads = 4;",
)
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to set PRAGMAs: {e}"),
})
})?;
self.conn = Some(conn);
if self.overwrite {
let drop_sql = format!("DROP TABLE IF EXISTS {}", self.table_name);
if let Some(conn) = &self.conn {
conn.execute(&drop_sql, []).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to drop table: {e}"),
})
})?;
info!("Dropped existing table: {}", self.table_name);
}
} else if !self.append {
if let Some(conn) = &self.conn {
let delete_sql = format!("DELETE FROM {}", self.table_name);
let _ = conn.execute(&delete_sql, []);
info!("Cleared existing data from table: {}", self.table_name);
}
}
self.create_table()?;
if let Some(conn) = &self.conn {
conn.execute_batch("BEGIN TRANSACTION;").map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to begin transaction: {e}"),
})
})?;
}
info!("SQLite exporter initialized: {}", self.database_url);
Ok(())
}
fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
let conn = self.conn.as_ref().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let sql = format!(
"INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
self.table_name
);
let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to prepare statement: {e}"),
})
})?;
let meta = sqllog.parse_meta();
let indicators = sqllog.parse_indicators();
let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
(
Some(ind.execute_time),
Some(ind.row_count),
Some(ind.execute_id),
)
} else {
(None, None, None)
};
stmt.execute(params![
sqllog.ts,
meta.ep,
meta.sess_id,
meta.thrd_id,
meta.username,
meta.trxid,
meta.statement,
meta.appname,
meta.client_ip,
sqllog.tag,
sqllog.body().as_ref(),
exec_time,
row_count,
exec_id
])
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to insert record: {e}"),
})
})?;
self.stats.record_success();
Ok(())
}
fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
if sqllogs.is_empty() {
return Ok(());
}
let conn = self.conn.as_ref().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let sql = format!(
"INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
self.table_name
);
let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to prepare statement: {e}"),
})
})?;
const CHUNK_SIZE: usize = 500;
for chunk in sqllogs.chunks(CHUNK_SIZE) {
let records: Vec<_> = chunk
.par_iter()
.map(|sqllog| {
let meta = sqllog.parse_meta();
let indicators = sqllog.parse_indicators();
let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
(
Some(ind.execute_time),
Some(ind.row_count),
Some(ind.execute_id),
)
} else {
(None, None, None)
};
(
sqllog.ts.to_string(),
meta.ep,
meta.sess_id.to_string(),
meta.thrd_id.to_string(),
meta.username.to_string(),
meta.trxid.to_string(),
meta.statement.to_string(),
meta.appname.to_string(),
meta.client_ip.to_string(),
sqllog.tag.as_ref().map(ToString::to_string),
sqllog.body().to_string(),
exec_time,
row_count,
exec_id,
)
})
.collect();
for (
ts,
ep,
sess_id,
thrd_id,
username,
trxid,
statement,
appname,
client_ip,
tag,
sql_body,
exec_time,
row_count,
exec_id,
) in records
{
stmt.execute(params![
ts, ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip, tag,
sql_body, exec_time, row_count, exec_id
])
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to insert record: {e}"),
})
})?;
self.stats.record_success();
}
}
Ok(())
}
fn finalize(&mut self) -> Result<()> {
if let Some(conn) = &self.conn {
conn.execute_batch("COMMIT;").map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to commit transaction: {e}"),
})
})?;
}
info!(
"SQLite export finished: {} (success: {}, failed: {})",
self.database_url, self.stats.exported, self.stats.failed
);
Ok(())
}
fn name(&self) -> &'static str {
"SQLite"
}
fn stats_snapshot(&self) -> Option<ExportStats> {
Some(self.stats.clone())
}
}
impl Drop for SqliteExporter {
fn drop(&mut self) {
}
}