Skip to main content

dm_database_sqllog2db/exporter/
sqlite.rs

1use super::{ExportStats, Exporter};
2use crate::error::{Error, ExportError, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::info;
5use rayon::prelude::*;
6use rusqlite::{Connection, params};
7use std::path::Path;
8
9/// `SQLite` 导出器 - 直接插入版本 (高性能)
10pub struct SqliteExporter {
11    database_url: String,
12    table_name: String,
13    overwrite: bool,
14    append: bool,
15    conn: Option<Connection>,
16    stats: ExportStats,
17}
18
19impl std::fmt::Debug for SqliteExporter {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("SqliteExporter")
22            .field("database_url", &self.database_url)
23            .field("table_name", &self.table_name)
24            .field("overwrite", &self.overwrite)
25            .field("append", &self.append)
26            .field("stats", &self.stats)
27            .finish_non_exhaustive()
28    }
29}
30
31impl SqliteExporter {
32    /// 创建新的 `SQLite` 导出器
33    #[must_use]
34    pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
35        Self {
36            database_url,
37            table_name,
38            overwrite,
39            append,
40            conn: None,
41            stats: ExportStats::new(),
42        }
43    }
44
45    /// 从配置创建 `SQLite` 导出器
46    #[must_use]
47    pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
48        Self::new(
49            config.database_url.clone(),
50            config.table_name.clone(),
51            config.overwrite,
52            config.append,
53        )
54    }
55
56    /// 创建数据库表
57    fn create_table(&self) -> Result<()> {
58        let conn = self.conn.as_ref().ok_or_else(|| {
59            Error::Export(ExportError::DatabaseError {
60                reason: "Connection not initialized".to_string(),
61            })
62        })?;
63
64        let sql = format!(
65            r"
66            CREATE TABLE IF NOT EXISTS {} (
67                ts TEXT NOT NULL,
68                ep INTEGER NOT NULL,
69                sess_id TEXT NOT NULL,
70                thrd_id TEXT NOT NULL,
71                username TEXT NOT NULL,
72                trx_id TEXT NOT NULL,
73                statement TEXT,
74                appname TEXT,
75                client_ip TEXT,
76                tag TEXT,
77                sql TEXT NOT NULL,
78                exec_time_ms REAL,
79                row_count INTEGER,
80                exec_id INTEGER
81            )
82            ",
83            self.table_name
84        );
85
86        conn.execute(&sql, []).map_err(|e| {
87            Error::Export(ExportError::DatabaseError {
88                reason: format!("Failed to create table: {e}"),
89            })
90        })?;
91
92        info!("SQLite table created or already exists");
93        Ok(())
94    }
95}
96
97impl Exporter for SqliteExporter {
98    fn initialize(&mut self) -> Result<()> {
99        info!("Initializing SQLite exporter: {}", self.database_url);
100
101        // 确保目录存在
102        let path = Path::new(&self.database_url);
103        if let Some(parent) = path.parent().filter(|p| !p.exists()) {
104            std::fs::create_dir_all(parent).map_err(|e| {
105                Error::Export(ExportError::DatabaseError {
106                    reason: format!("Failed to create directory: {e}"),
107                })
108            })?;
109        }
110
111        // 创建数据库连接
112        let conn = Connection::open(&self.database_url).map_err(|e| {
113            Error::Export(ExportError::DatabaseError {
114                reason: format!("Failed to open database: {e}"),
115            })
116        })?;
117
118        // 性能优化: 关闭同步和日志,使用内存模式
119        conn.execute_batch(
120            "PRAGMA journal_mode = OFF;
121             PRAGMA synchronous = OFF;
122             PRAGMA cache_size = 1000000;
123             PRAGMA locking_mode = EXCLUSIVE;
124             PRAGMA temp_store = MEMORY;
125             PRAGMA mmap_size = 30000000000;
126             PRAGMA page_size = 65536;
127             PRAGMA threads = 4;",
128        )
129        .map_err(|e| {
130            Error::Export(ExportError::DatabaseError {
131                reason: format!("Failed to set PRAGMAs: {e}"),
132            })
133        })?;
134
135        self.conn = Some(conn);
136
137        // 处理 overwrite/append 逻辑
138        if self.overwrite {
139            // 如果 overwrite=true,删除已存在的表
140            let drop_sql = format!("DROP TABLE IF EXISTS {}", self.table_name);
141            if let Some(conn) = &self.conn {
142                conn.execute(&drop_sql, []).map_err(|e| {
143                    Error::Export(ExportError::DatabaseError {
144                        reason: format!("Failed to drop table: {e}"),
145                    })
146                })?;
147                info!("Dropped existing table: {}", self.table_name);
148            }
149        } else if !self.append {
150            // 如果 overwrite=false 且 append=false,清空表数据
151            if let Some(conn) = &self.conn {
152                let delete_sql = format!("DELETE FROM {}", self.table_name);
153                // 尝试清空,如果表不存在则忽略错误
154                let _ = conn.execute(&delete_sql, []);
155                info!("Cleared existing data from table: {}", self.table_name);
156            }
157        }
158
159        // 创建表
160        self.create_table()?;
161
162        // 开启事务
163        if let Some(conn) = &self.conn {
164            conn.execute_batch("BEGIN TRANSACTION;").map_err(|e| {
165                Error::Export(ExportError::DatabaseError {
166                    reason: format!("Failed to begin transaction: {e}"),
167                })
168            })?;
169        }
170
171        info!("SQLite exporter initialized: {}", self.database_url);
172        Ok(())
173    }
174
175    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
176        let conn = self.conn.as_ref().ok_or_else(|| {
177            Error::Export(ExportError::DatabaseError {
178                reason: "Connection not initialized".to_string(),
179            })
180        })?;
181
182        // 使用 prepare_cached 缓存预编译语句
183        // 注意:这里每次都 format 字符串,但由于 table_name 不变,字符串内容不变,prepare_cached 会命中缓存
184        let sql = format!(
185            "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
186            self.table_name
187        );
188
189        let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
190            Error::Export(ExportError::DatabaseError {
191                reason: format!("Failed to prepare statement: {e}"),
192            })
193        })?;
194
195        let meta = sqllog.parse_meta();
196        let indicators = sqllog.parse_indicators();
197
198        let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
199            (
200                Some(ind.execute_time),
201                Some(ind.row_count),
202                Some(ind.execute_id),
203            )
204        } else {
205            (None, None, None)
206        };
207
208        stmt.execute(params![
209            sqllog.ts,
210            meta.ep,
211            meta.sess_id,
212            meta.thrd_id,
213            meta.username,
214            meta.trxid,
215            meta.statement,
216            meta.appname,
217            meta.client_ip,
218            sqllog.tag,
219            sqllog.body().as_ref(),
220            exec_time,
221            row_count,
222            exec_id
223        ])
224        .map_err(|e| {
225            Error::Export(ExportError::DatabaseError {
226                reason: format!("Failed to insert record: {e}"),
227            })
228        })?;
229
230        self.stats.record_success();
231        Ok(())
232    }
233
234    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
235        if sqllogs.is_empty() {
236            return Ok(());
237        }
238
239        let conn = self.conn.as_ref().ok_or_else(|| {
240            Error::Export(ExportError::DatabaseError {
241                reason: "Connection not initialized".to_string(),
242            })
243        })?;
244
245        let sql = format!(
246            "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
247            self.table_name
248        );
249
250        let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
251            Error::Export(ExportError::DatabaseError {
252                reason: format!("Failed to prepare statement: {e}"),
253            })
254        })?;
255
256        // 内存优化:流式处理避免峰值
257        // 分块处理(每 500 条),避免存储大量中间记录
258        const CHUNK_SIZE: usize = 500;
259        for chunk in sqllogs.chunks(CHUNK_SIZE) {
260            let records: Vec<_> = chunk
261                .par_iter()
262                .map(|sqllog| {
263                    let meta = sqllog.parse_meta();
264                    let indicators = sqllog.parse_indicators();
265                    let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
266                        (
267                            Some(ind.execute_time),
268                            Some(ind.row_count),
269                            Some(ind.execute_id),
270                        )
271                    } else {
272                        (None, None, None)
273                    };
274                    (
275                        sqllog.ts.to_string(),
276                        meta.ep,
277                        meta.sess_id.to_string(),
278                        meta.thrd_id.to_string(),
279                        meta.username.to_string(),
280                        meta.trxid.to_string(),
281                        meta.statement.to_string(),
282                        meta.appname.to_string(),
283                        meta.client_ip.to_string(),
284                        sqllog.tag.as_ref().map(ToString::to_string),
285                        sqllog.body().to_string(),
286                        exec_time,
287                        row_count,
288                        exec_id,
289                    )
290                })
291                .collect();
292
293            for (
294                ts,
295                ep,
296                sess_id,
297                thrd_id,
298                username,
299                trxid,
300                statement,
301                appname,
302                client_ip,
303                tag,
304                sql_body,
305                exec_time,
306                row_count,
307                exec_id,
308            ) in records
309            {
310                stmt.execute(params![
311                    ts, ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip, tag,
312                    sql_body, exec_time, row_count, exec_id
313                ])
314                .map_err(|e| {
315                    Error::Export(ExportError::DatabaseError {
316                        reason: format!("Failed to insert record: {e}"),
317                    })
318                })?;
319
320                self.stats.record_success();
321            }
322        }
323
324        Ok(())
325    }
326
327    fn finalize(&mut self) -> Result<()> {
328        // 提交事务
329        if let Some(conn) = &self.conn {
330            conn.execute_batch("COMMIT;").map_err(|e| {
331                Error::Export(ExportError::DatabaseError {
332                    reason: format!("Failed to commit transaction: {e}"),
333                })
334            })?;
335        }
336
337        info!(
338            "SQLite export finished: {} (success: {}, failed: {})",
339            self.database_url, self.stats.exported, self.stats.failed
340        );
341
342        Ok(())
343    }
344
345    fn name(&self) -> &'static str {
346        "SQLite"
347    }
348
349    fn stats_snapshot(&self) -> Option<ExportStats> {
350        Some(self.stats.clone())
351    }
352}
353
354impl Drop for SqliteExporter {
355    fn drop(&mut self) {
356        // 如果连接存在且未显式 finalize (可能 panic 或提前退出),尝试回滚或提交?
357        // 这里不做复杂处理,依赖 OS 回收文件锁
358        // 如果事务未提交,SQLite 会自动回滚
359    }
360}