dm_database_sqllog2db/exporter/
postgres.rs1use super::{ExportStats, Exporter, csv::CsvExporter};
2use crate::error::{Error, ExportError, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::{debug, info, warn};
5use postgres::{Client, NoTls};
6use tempfile::NamedTempFile;
7
8pub struct PostgresExporter {
10 connection_string: String,
11 host: String,
12 port: u16,
13 username: String,
14 password: String,
15 database: String,
16 schema: String,
17 table_name: String,
18 overwrite: bool,
19 append: bool,
20 client: Option<Client>,
21 stats: ExportStats,
22 csv_exporter: Option<CsvExporter>,
23 temp_csv: Option<NamedTempFile>,
24}
25
26impl std::fmt::Debug for PostgresExporter {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("PostgresExporter")
29 .field("host", &self.host)
30 .field("port", &self.port)
31 .field("username", &self.username)
32 .field("database", &self.database)
33 .field("schema", &self.schema)
34 .field("table_name", &self.table_name)
35 .field("stats", &self.stats)
36 .finish_non_exhaustive()
37 }
38}
39
40impl PostgresExporter {
41 #[must_use]
43 pub fn new(
44 connection_string: String,
45 host: String,
46 port: u16,
47 username: String,
48 password: String,
49 database: String,
50 schema: String,
51 table_name: String,
52 overwrite: bool,
53 append: bool,
54 ) -> Self {
55 Self {
56 connection_string,
57 host,
58 port,
59 username,
60 password,
61 database,
62 schema,
63 table_name,
64 overwrite,
65 append,
66 client: None,
67 stats: ExportStats::new(),
68 csv_exporter: None,
69 temp_csv: None,
70 }
71 }
72
73 #[must_use]
75 pub fn from_config(config: &crate::config::PostgresExporter) -> Self {
76 Self::new(
77 config.connection_string(),
78 config.host.clone(),
79 config.port,
80 config.username.clone(),
81 config.password.clone(),
82 config.database.clone(),
83 config.schema.clone(),
84 config.table_name.clone(),
85 config.overwrite,
86 config.append,
87 )
88 }
89
90 fn full_table_name(&self) -> String {
92 format!("{}.{}", self.schema, self.table_name)
93 }
94
95 fn create_table(&mut self) -> Result<()> {
97 let full_table_name = self.full_table_name();
98 let client = self.client.as_mut().ok_or_else(|| {
99 Error::Export(ExportError::DatabaseError {
100 reason: "Connection not initialized".to_string(),
101 })
102 })?;
103
104 let sql = format!(
105 r"
106 CREATE UNLOGGED TABLE IF NOT EXISTS {full_table_name} (
107 ts VARCHAR,
108 ep INTEGER,
109 sess_id VARCHAR,
110 thrd_id VARCHAR,
111 username VARCHAR,
112 trx_id VARCHAR,
113 statement VARCHAR,
114 appname VARCHAR,
115 client_ip VARCHAR,
116 sql TEXT,
117 exec_time_ms REAL,
118 row_count INTEGER,
119 exec_id BIGINT
120 )
121 "
122 );
123
124 client.execute(&sql, &[]).map_err(|e| {
125 Error::Export(ExportError::DatabaseError {
126 reason: format!("Failed to create table: {e}"),
127 })
128 })?;
129
130 info!("PostgreSQL table created or already exists");
131 Ok(())
132 }
133
134 fn flush(&mut self) -> Result<()> {
136 if let Some(csv_exporter) = &mut self.csv_exporter {
138 <CsvExporter as Exporter>::finalize(csv_exporter)?;
139 }
140
141 let temp_csv = self.temp_csv.take().ok_or_else(|| {
142 Error::Export(ExportError::DatabaseError {
143 reason: "No temporary CSV file".to_string(),
144 })
145 })?;
146
147 let full_table_name = self.full_table_name();
148 let csv_path = temp_csv.path().to_string_lossy().replace('\\', "/");
149
150 info!("Starting CSV import into PostgreSQL via psql COPY for table: {full_table_name}");
151
152 let copy_sql = format!(
154 "\\COPY {full_table_name} (ts, ep, sess_id, thrd_id, username, trx_id, statement, appname, client_ip, sql, exec_time_ms, row_count, exec_id) FROM '{csv_path}' WITH (FORMAT CSV, HEADER true)",
155 csv_path = csv_path.replace('\'', "''")
156 );
157
158 let mut cmd = std::process::Command::new("psql");
159 cmd.arg("-h")
160 .arg(&self.host)
161 .arg("-p")
162 .arg(self.port.to_string())
163 .arg("-U")
164 .arg(&self.username)
165 .arg("-d")
166 .arg(&self.database)
167 .arg("-c")
168 .arg(©_sql);
169
170 if !self.password.is_empty() {
172 cmd.env("PGPASSWORD", &self.password);
173 }
174
175 let output = cmd.output().map_err(|e| {
176 Error::Export(ExportError::DatabaseError {
177 reason: format!("Failed to execute psql: {e}"),
178 })
179 })?;
180
181 if !output.status.success() {
182 let stderr = String::from_utf8_lossy(&output.stderr);
183 return Err(Error::Export(ExportError::DatabaseError {
184 reason: format!("PostgreSQL import failed: {stderr}"),
185 }));
186 }
187
188 let stdout = String::from_utf8_lossy(&output.stdout);
189 info!("PostgreSQL import completed: {}", stdout.trim());
190
191 self.stats.flush_operations += 1;
192 self.stats.last_flush_size = self.stats.exported;
193
194 Ok(())
195 }
196}
197
198impl Exporter for PostgresExporter {
199 fn initialize(&mut self) -> Result<()> {
200 info!("Initializing PostgreSQL exporter");
201
202 debug!("Connection string: {}", self.connection_string);
204
205 let mut client = Client::connect(&self.connection_string, NoTls).map_err(|e| {
207 Error::Export(ExportError::DatabaseError {
208 reason: format!("Failed to connect to database: {e}"),
209 })
210 })?;
211
212 let _ = client.execute("SET synchronous_commit = OFF", &[]);
214 let _ = client.execute("SET maintenance_work_mem = '2GB'", &[]);
215 let _ = client.execute("SET work_mem = '512MB'", &[]);
216 let _ = client.execute("SET max_parallel_workers_per_gather = 8", &[]);
217 let _ = client.execute("SET max_parallel_workers = 16", &[]);
218 let _ = client.execute("SET shared_buffers = '2GB'", &[]);
219
220 self.client = Some(client);
221
222 if self.overwrite {
224 let full_table_name = self.full_table_name();
226 if let Some(client) = &mut self.client {
227 let drop_sql = format!("DROP TABLE IF EXISTS {full_table_name}");
228 client.execute(&drop_sql, &[]).map_err(|e| {
229 Error::Export(ExportError::DatabaseError {
230 reason: format!("Failed to drop table: {e}"),
231 })
232 })?;
233 info!("Dropped existing table: {full_table_name}");
234 }
235 } else if !self.append {
236 let full_table_name = self.full_table_name();
238 if let Some(client) = &mut self.client {
239 let delete_sql = format!("DELETE FROM {full_table_name}");
240 let _ = client.execute(&delete_sql, &[]);
242 info!("Cleared existing data from table: {full_table_name}");
243 }
244 }
245
246 self.create_table()?;
248
249 let temp_csv = NamedTempFile::new_in("export")
251 .map_err(|e| {
252 NamedTempFile::new().map_err(|e2| {
254 Error::Export(ExportError::DatabaseError {
255 reason: format!("Failed to create temp CSV file: {e} ({e2})"),
256 })
257 })
258 })
259 .or_else(|_| {
260 NamedTempFile::new().map_err(|e| {
261 Error::Export(ExportError::DatabaseError {
262 reason: format!("Failed to create temp CSV file: {e}"),
263 })
264 })
265 })?;
266
267 let csv_exporter = CsvExporter::new(temp_csv.path());
269 self.csv_exporter = Some(csv_exporter);
270 self.temp_csv = Some(temp_csv);
271
272 if let Some(csv_exporter) = &mut self.csv_exporter {
274 csv_exporter.initialize()?;
275 }
276
277 info!("PostgreSQL exporter initialized");
278 Ok(())
279 }
280
281 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
282 if let Some(csv_exporter) = &mut self.csv_exporter {
284 csv_exporter.export(sqllog)?;
285 }
286
287 self.stats.record_success();
288 Ok(())
289 }
290
291 fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
292 debug!("Exporting {} records to PostgreSQL in batch", sqllogs.len());
293
294 if let Some(csv_exporter) = &mut self.csv_exporter {
296 csv_exporter.export_batch(sqllogs)?;
297 self.stats.exported += sqllogs.len();
298 }
299
300 Ok(())
301 }
302
303 fn finalize(&mut self) -> Result<()> {
304 self.flush()?;
306
307 self.csv_exporter = None;
309 self.temp_csv = None;
310
311 info!(
312 "PostgreSQL export finished (success: {}, failed: {})",
313 self.stats.exported, self.stats.failed
314 );
315
316 Ok(())
317 }
318
319 fn name(&self) -> &'static str {
320 "PostgreSQL"
321 }
322
323 fn stats_snapshot(&self) -> Option<ExportStats> {
324 Some(self.stats.clone())
325 }
326}
327
328impl Drop for PostgresExporter {
329 fn drop(&mut self) {
330 if self.csv_exporter.is_some()
332 && self.temp_csv.is_some()
333 && let Err(e) = self.finalize()
334 {
335 warn!("PostgreSQL exporter finalization on Drop failed: {e}");
336 }
337 }
338}