use super::{ExportStats, Exporter, csv::CsvExporter};
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use log::{debug, info, warn};
use postgres::{Client, NoTls};
use tempfile::NamedTempFile;
pub struct PostgresExporter {
connection_string: String,
host: String,
port: u16,
username: String,
password: String,
database: String,
schema: String,
table_name: String,
overwrite: bool,
append: bool,
client: Option<Client>,
stats: ExportStats,
csv_exporter: Option<CsvExporter>,
temp_csv: Option<NamedTempFile>,
}
impl std::fmt::Debug for PostgresExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresExporter")
.field("host", &self.host)
.field("port", &self.port)
.field("username", &self.username)
.field("database", &self.database)
.field("schema", &self.schema)
.field("table_name", &self.table_name)
.field("stats", &self.stats)
.finish_non_exhaustive()
}
}
impl PostgresExporter {
#[must_use]
pub fn new(
connection_string: String,
host: String,
port: u16,
username: String,
password: String,
database: String,
schema: String,
table_name: String,
overwrite: bool,
append: bool,
) -> Self {
Self {
connection_string,
host,
port,
username,
password,
database,
schema,
table_name,
overwrite,
append,
client: None,
stats: ExportStats::new(),
csv_exporter: None,
temp_csv: None,
}
}
#[must_use]
pub fn from_config(config: &crate::config::PostgresExporter) -> Self {
Self::new(
config.connection_string(),
config.host.clone(),
config.port,
config.username.clone(),
config.password.clone(),
config.database.clone(),
config.schema.clone(),
config.table_name.clone(),
config.overwrite,
config.append,
)
}
fn full_table_name(&self) -> String {
format!("{}.{}", self.schema, self.table_name)
}
fn create_table(&mut self) -> Result<()> {
let full_table_name = self.full_table_name();
let client = self.client.as_mut().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let sql = format!(
r"
CREATE UNLOGGED TABLE IF NOT EXISTS {full_table_name} (
ts VARCHAR,
ep INTEGER,
sess_id VARCHAR,
thrd_id VARCHAR,
username VARCHAR,
trx_id VARCHAR,
statement VARCHAR,
appname VARCHAR,
client_ip VARCHAR,
sql TEXT,
exec_time_ms REAL,
row_count INTEGER,
exec_id BIGINT
)
"
);
client.execute(&sql, &[]).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create table: {e}"),
})
})?;
info!("PostgreSQL table created or already exists");
Ok(())
}
fn flush(&mut self) -> Result<()> {
if let Some(csv_exporter) = &mut self.csv_exporter {
<CsvExporter as Exporter>::finalize(csv_exporter)?;
}
let temp_csv = self.temp_csv.take().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "No temporary CSV file".to_string(),
})
})?;
let full_table_name = self.full_table_name();
let csv_path = temp_csv.path().to_string_lossy().replace('\\', "/");
info!("Starting CSV import into PostgreSQL via psql COPY for table: {full_table_name}");
let copy_sql = format!(
"\\COPY {full_table_name} (ts, ep, sess_id, thrd_id, username, trx_id, statement, appname, client_ip, sql, exec_time_ms, row_count, exec_id) FROM '{csv_path}' WITH (FORMAT CSV, HEADER true)",
csv_path = csv_path.replace('\'', "''")
);
let mut cmd = std::process::Command::new("psql");
cmd.arg("-h")
.arg(&self.host)
.arg("-p")
.arg(self.port.to_string())
.arg("-U")
.arg(&self.username)
.arg("-d")
.arg(&self.database)
.arg("-c")
.arg(©_sql);
if !self.password.is_empty() {
cmd.env("PGPASSWORD", &self.password);
}
let output = cmd.output().map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to execute psql: {e}"),
})
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(Error::Export(ExportError::DatabaseError {
reason: format!("PostgreSQL import failed: {stderr}"),
}));
}
let stdout = String::from_utf8_lossy(&output.stdout);
info!("PostgreSQL import completed: {}", stdout.trim());
self.stats.flush_operations += 1;
self.stats.last_flush_size = self.stats.exported;
Ok(())
}
}
impl Exporter for PostgresExporter {
fn initialize(&mut self) -> Result<()> {
info!("Initializing PostgreSQL exporter");
debug!("Connection string: {}", self.connection_string);
let mut client = Client::connect(&self.connection_string, NoTls).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to connect to database: {e}"),
})
})?;
let _ = client.execute("SET synchronous_commit = OFF", &[]);
let _ = client.execute("SET maintenance_work_mem = '2GB'", &[]);
let _ = client.execute("SET work_mem = '512MB'", &[]);
let _ = client.execute("SET max_parallel_workers_per_gather = 8", &[]);
let _ = client.execute("SET max_parallel_workers = 16", &[]);
let _ = client.execute("SET shared_buffers = '2GB'", &[]);
self.client = Some(client);
if self.overwrite {
let full_table_name = self.full_table_name();
if let Some(client) = &mut self.client {
let drop_sql = format!("DROP TABLE IF EXISTS {full_table_name}");
client.execute(&drop_sql, &[]).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to drop table: {e}"),
})
})?;
info!("Dropped existing table: {full_table_name}");
}
} else if !self.append {
let full_table_name = self.full_table_name();
if let Some(client) = &mut self.client {
let delete_sql = format!("DELETE FROM {full_table_name}");
let _ = client.execute(&delete_sql, &[]);
info!("Cleared existing data from table: {full_table_name}");
}
}
self.create_table()?;
let temp_csv = NamedTempFile::new_in("export")
.map_err(|e| {
NamedTempFile::new().map_err(|e2| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create temp CSV file: {e} ({e2})"),
})
})
})
.or_else(|_| {
NamedTempFile::new().map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create temp CSV file: {e}"),
})
})
})?;
let csv_exporter = CsvExporter::new(temp_csv.path());
self.csv_exporter = Some(csv_exporter);
self.temp_csv = Some(temp_csv);
if let Some(csv_exporter) = &mut self.csv_exporter {
csv_exporter.initialize()?;
}
info!("PostgreSQL exporter initialized");
Ok(())
}
fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
if let Some(csv_exporter) = &mut self.csv_exporter {
csv_exporter.export(sqllog)?;
}
self.stats.record_success();
Ok(())
}
fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
debug!("Exporting {} records to PostgreSQL in batch", sqllogs.len());
if let Some(csv_exporter) = &mut self.csv_exporter {
csv_exporter.export_batch(sqllogs)?;
self.stats.exported += sqllogs.len();
}
Ok(())
}
fn finalize(&mut self) -> Result<()> {
self.flush()?;
self.csv_exporter = None;
self.temp_csv = None;
info!(
"PostgreSQL export finished (success: {}, failed: {})",
self.stats.exported, self.stats.failed
);
Ok(())
}
fn name(&self) -> &'static str {
"PostgreSQL"
}
fn stats_snapshot(&self) -> Option<ExportStats> {
Some(self.stats.clone())
}
}
impl Drop for PostgresExporter {
fn drop(&mut self) {
if self.csv_exporter.is_some()
&& self.temp_csv.is_some()
&& let Err(e) = self.finalize()
{
warn!("PostgreSQL exporter finalization on Drop failed: {e}");
}
}
}