Skip to main content

dm_database_sqllog2db/exporter/
postgres.rs

1use super::{ExportStats, Exporter, csv::CsvExporter};
2use crate::error::{Error, ExportError, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::{debug, info, warn};
5use postgres::{Client, NoTls};
6use tempfile::NamedTempFile;
7
8/// `PostgreSQL` 导出器 - 使用 CSV + psql COPY FROM
9pub struct PostgresExporter {
10    connection_string: String,
11    host: String,
12    port: u16,
13    username: String,
14    password: String,
15    database: String,
16    schema: String,
17    table_name: String,
18    overwrite: bool,
19    append: bool,
20    client: Option<Client>,
21    stats: ExportStats,
22    csv_exporter: Option<CsvExporter>,
23    temp_csv: Option<NamedTempFile>,
24}
25
26impl std::fmt::Debug for PostgresExporter {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("PostgresExporter")
29            .field("host", &self.host)
30            .field("port", &self.port)
31            .field("username", &self.username)
32            .field("database", &self.database)
33            .field("schema", &self.schema)
34            .field("table_name", &self.table_name)
35            .field("stats", &self.stats)
36            .finish_non_exhaustive()
37    }
38}
39
40impl PostgresExporter {
41    /// 创建新的 `PostgreSQL` 导出器
42    #[must_use]
43    pub fn new(
44        connection_string: String,
45        host: String,
46        port: u16,
47        username: String,
48        password: String,
49        database: String,
50        schema: String,
51        table_name: String,
52        overwrite: bool,
53        append: bool,
54    ) -> Self {
55        Self {
56            connection_string,
57            host,
58            port,
59            username,
60            password,
61            database,
62            schema,
63            table_name,
64            overwrite,
65            append,
66            client: None,
67            stats: ExportStats::new(),
68            csv_exporter: None,
69            temp_csv: None,
70        }
71    }
72
73    /// 从配置创建 `PostgreSQL` 导出器
74    #[must_use]
75    pub fn from_config(config: &crate::config::PostgresExporter) -> Self {
76        Self::new(
77            config.connection_string(),
78            config.host.clone(),
79            config.port,
80            config.username.clone(),
81            config.password.clone(),
82            config.database.clone(),
83            config.schema.clone(),
84            config.table_name.clone(),
85            config.overwrite,
86            config.append,
87        )
88    }
89
90    /// 获取完整表名
91    fn full_table_name(&self) -> String {
92        format!("{}.{}", self.schema, self.table_name)
93    }
94
95    /// 创建数据库表
96    fn create_table(&mut self) -> Result<()> {
97        let full_table_name = self.full_table_name();
98        let client = self.client.as_mut().ok_or_else(|| {
99            Error::Export(ExportError::DatabaseError {
100                reason: "Connection not initialized".to_string(),
101            })
102        })?;
103
104        let sql = format!(
105            r"
106                CREATE UNLOGGED TABLE IF NOT EXISTS {full_table_name} (
107                    ts VARCHAR,
108                    ep INTEGER,
109                    sess_id VARCHAR,
110                    thrd_id VARCHAR,
111                    username VARCHAR,
112                    trx_id VARCHAR,
113                    statement VARCHAR,
114                    appname VARCHAR,
115                    client_ip VARCHAR,
116                    sql TEXT,
117                    exec_time_ms REAL,
118                    row_count INTEGER,
119                    exec_id BIGINT
120                )
121                "
122        );
123
124        client.execute(&sql, &[]).map_err(|e| {
125            Error::Export(ExportError::DatabaseError {
126                reason: format!("Failed to create table: {e}"),
127            })
128        })?;
129
130        info!("PostgreSQL table created or already exists");
131        Ok(())
132    }
133
134    /// 刷新待处理记录到数据库(使用 psql COPY FROM)
135    fn flush(&mut self) -> Result<()> {
136        // 先刷新 CSV 导出器
137        if let Some(csv_exporter) = &mut self.csv_exporter {
138            <CsvExporter as Exporter>::finalize(csv_exporter)?;
139        }
140
141        let temp_csv = self.temp_csv.take().ok_or_else(|| {
142            Error::Export(ExportError::DatabaseError {
143                reason: "No temporary CSV file".to_string(),
144            })
145        })?;
146
147        let full_table_name = self.full_table_name();
148        let csv_path = temp_csv.path().to_string_lossy().replace('\\', "/");
149
150        info!("Starting CSV import into PostgreSQL via psql COPY for table: {full_table_name}");
151
152        // 使用 psql 命令行工具执行 COPY FROM,比客户端传输快得多
153        let copy_sql = format!(
154            "\\COPY {full_table_name} (ts, ep, sess_id, thrd_id, username, trx_id, statement, appname, client_ip, sql, exec_time_ms, row_count, exec_id) FROM '{csv_path}' WITH (FORMAT CSV, HEADER true)",
155            csv_path = csv_path.replace('\'', "''")
156        );
157
158        let mut cmd = std::process::Command::new("psql");
159        cmd.arg("-h")
160            .arg(&self.host)
161            .arg("-p")
162            .arg(self.port.to_string())
163            .arg("-U")
164            .arg(&self.username)
165            .arg("-d")
166            .arg(&self.database)
167            .arg("-c")
168            .arg(&copy_sql);
169
170        // 如果有密码,通过环境变量传递
171        if !self.password.is_empty() {
172            cmd.env("PGPASSWORD", &self.password);
173        }
174
175        let output = cmd.output().map_err(|e| {
176            Error::Export(ExportError::DatabaseError {
177                reason: format!("Failed to execute psql: {e}"),
178            })
179        })?;
180
181        if !output.status.success() {
182            let stderr = String::from_utf8_lossy(&output.stderr);
183            return Err(Error::Export(ExportError::DatabaseError {
184                reason: format!("PostgreSQL import failed: {stderr}"),
185            }));
186        }
187
188        let stdout = String::from_utf8_lossy(&output.stdout);
189        info!("PostgreSQL import completed: {}", stdout.trim());
190
191        self.stats.flush_operations += 1;
192        self.stats.last_flush_size = self.stats.exported;
193
194        Ok(())
195    }
196}
197
198impl Exporter for PostgresExporter {
199    fn initialize(&mut self) -> Result<()> {
200        info!("Initializing PostgreSQL exporter");
201
202        // 输出连接字符串用于调试
203        debug!("Connection string: {}", self.connection_string);
204
205        // 创建连接
206        let mut client = Client::connect(&self.connection_string, NoTls).map_err(|e| {
207            Error::Export(ExportError::DatabaseError {
208                reason: format!("Failed to connect to database: {e}"),
209            })
210        })?;
211
212        // 优化性能设置
213        let _ = client.execute("SET synchronous_commit = OFF", &[]);
214        let _ = client.execute("SET maintenance_work_mem = '2GB'", &[]);
215        let _ = client.execute("SET work_mem = '512MB'", &[]);
216        let _ = client.execute("SET max_parallel_workers_per_gather = 8", &[]);
217        let _ = client.execute("SET max_parallel_workers = 16", &[]);
218        let _ = client.execute("SET shared_buffers = '2GB'", &[]);
219
220        self.client = Some(client);
221
222        // 处理 overwrite/append 逻辑
223        if self.overwrite {
224            // 如果 overwrite=true,删除已存在的表
225            let full_table_name = self.full_table_name();
226            if let Some(client) = &mut self.client {
227                let drop_sql = format!("DROP TABLE IF EXISTS {full_table_name}");
228                client.execute(&drop_sql, &[]).map_err(|e| {
229                    Error::Export(ExportError::DatabaseError {
230                        reason: format!("Failed to drop table: {e}"),
231                    })
232                })?;
233                info!("Dropped existing table: {full_table_name}");
234            }
235        } else if !self.append {
236            // 如果 overwrite=false 且 append=false,清空表数据
237            let full_table_name = self.full_table_name();
238            if let Some(client) = &mut self.client {
239                let delete_sql = format!("DELETE FROM {full_table_name}");
240                // 尝试清空,如果表不存在则忽略错误
241                let _ = client.execute(&delete_sql, &[]);
242                info!("Cleared existing data from table: {full_table_name}");
243            }
244        }
245
246        // 创建表
247        self.create_table()?;
248
249        // 创建临时 CSV 文件(使用当前目录以避免跨磁盘操作)
250        let temp_csv = NamedTempFile::new_in("export")
251            .map_err(|e| {
252                // 如果 export 目录不存在,使用系统临时目录
253                NamedTempFile::new().map_err(|e2| {
254                    Error::Export(ExportError::DatabaseError {
255                        reason: format!("Failed to create temp CSV file: {e} ({e2})"),
256                    })
257                })
258            })
259            .or_else(|_| {
260                NamedTempFile::new().map_err(|e| {
261                    Error::Export(ExportError::DatabaseError {
262                        reason: format!("Failed to create temp CSV file: {e}"),
263                    })
264                })
265            })?;
266
267        // 创建 CSV 导出器
268        let csv_exporter = CsvExporter::new(temp_csv.path());
269        self.csv_exporter = Some(csv_exporter);
270        self.temp_csv = Some(temp_csv);
271
272        // 初始化 CSV 导出器
273        if let Some(csv_exporter) = &mut self.csv_exporter {
274            csv_exporter.initialize()?;
275        }
276
277        info!("PostgreSQL exporter initialized");
278        Ok(())
279    }
280
281    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
282        // 导出到临时 CSV
283        if let Some(csv_exporter) = &mut self.csv_exporter {
284            csv_exporter.export(sqllog)?;
285        }
286
287        self.stats.record_success();
288        Ok(())
289    }
290
291    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
292        debug!("Exporting {} records to PostgreSQL in batch", sqllogs.len());
293
294        // 直接使用 CSV 导出器的批量导出
295        if let Some(csv_exporter) = &mut self.csv_exporter {
296            csv_exporter.export_batch(sqllogs)?;
297            self.stats.exported += sqllogs.len();
298        }
299
300        Ok(())
301    }
302
303    fn finalize(&mut self) -> Result<()> {
304        // 从 CSV 批量导入
305        self.flush()?;
306
307        // 成功后释放资源,避免 Drop 时重复 finalize 产生告警
308        self.csv_exporter = None;
309        self.temp_csv = None;
310
311        info!(
312            "PostgreSQL export finished (success: {}, failed: {})",
313            self.stats.exported, self.stats.failed
314        );
315
316        Ok(())
317    }
318
319    fn name(&self) -> &'static str {
320        "PostgreSQL"
321    }
322
323    fn stats_snapshot(&self) -> Option<ExportStats> {
324        Some(self.stats.clone())
325    }
326}
327
328impl Drop for PostgresExporter {
329    fn drop(&mut self) {
330        // 仅当仍持有 CSV 导出器与临时文件时才尝试 finalize
331        if self.csv_exporter.is_some()
332            && self.temp_csv.is_some()
333            && let Err(e) = self.finalize()
334        {
335            warn!("PostgreSQL exporter finalization on Drop failed: {e}");
336        }
337    }
338}