Skip to main content

dm_database_sqllog2db/exporter/
sqlite.rs

1use super::{ExportStats, Exporter};
2use crate::error::{Error, ExportError, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::info;
5use rayon::prelude::*;
6use rusqlite::{Connection, params};
7use std::path::Path;
8
9/// `SQLite` 导出器 - 直接插入版本 (高性能)
10pub struct SqliteExporter {
11    database_url: String,
12    table_name: String,
13    overwrite: bool,
14    append: bool,
15    conn: Option<Connection>,
16    stats: ExportStats,
17}
18
19impl std::fmt::Debug for SqliteExporter {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("SqliteExporter")
22            .field("database_url", &self.database_url)
23            .field("table_name", &self.table_name)
24            .field("overwrite", &self.overwrite)
25            .field("append", &self.append)
26            .field("stats", &self.stats)
27            .finish_non_exhaustive()
28    }
29}
30
31impl SqliteExporter {
32    /// 创建新的 `SQLite` 导出器
33    #[must_use]
34    pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
35        Self {
36            database_url,
37            table_name,
38            overwrite,
39            append,
40            conn: None,
41            stats: ExportStats::new(),
42        }
43    }
44
45    /// 从配置创建 `SQLite` 导出器
46    #[must_use]
47    pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
48        Self::new(
49            config.database_url.clone(),
50            config.table_name.clone(),
51            config.overwrite,
52            config.append,
53        )
54    }
55
56    /// 创建数据库表
57    fn create_table(&self) -> Result<()> {
58        let conn = self.conn.as_ref().ok_or_else(|| {
59            Error::Export(ExportError::DatabaseError {
60                reason: "Connection not initialized".to_string(),
61            })
62        })?;
63
64        let sql = format!(
65            r"
66            CREATE TABLE IF NOT EXISTS {} (
67                ts TEXT NOT NULL,
68                ep INTEGER NOT NULL,
69                sess_id TEXT NOT NULL,
70                thrd_id TEXT NOT NULL,
71                username TEXT NOT NULL,
72                trx_id TEXT NOT NULL,
73                statement TEXT NOT NULL,
74                appname TEXT,
75                client_ip TEXT,
76                sql TEXT NOT NULL,
77                exec_time_ms REAL,
78                row_count INTEGER,
79                exec_id INTEGER
80            )
81            ",
82            self.table_name
83        );
84
85        conn.execute(&sql, []).map_err(|e| {
86            Error::Export(ExportError::DatabaseError {
87                reason: format!("Failed to create table: {e}"),
88            })
89        })?;
90
91        info!("SQLite table created or already exists");
92        Ok(())
93    }
94}
95
96impl Exporter for SqliteExporter {
97    fn initialize(&mut self) -> Result<()> {
98        info!("Initializing SQLite exporter: {}", self.database_url);
99
100        // 确保目录存在
101        let path = Path::new(&self.database_url);
102        if let Some(parent) = path.parent().filter(|p| !p.exists()) {
103            std::fs::create_dir_all(parent).map_err(|e| {
104                Error::Export(ExportError::DatabaseError {
105                    reason: format!("Failed to create directory: {e}"),
106                })
107            })?;
108        }
109
110        // 创建数据库连接
111        let conn = Connection::open(&self.database_url).map_err(|e| {
112            Error::Export(ExportError::DatabaseError {
113                reason: format!("Failed to open database: {e}"),
114            })
115        })?;
116
117        // 性能优化: 关闭同步和日志,使用内存模式
118        conn.execute_batch(
119            "PRAGMA journal_mode = OFF;
120             PRAGMA synchronous = OFF;
121             PRAGMA cache_size = 1000000;
122             PRAGMA locking_mode = EXCLUSIVE;
123             PRAGMA temp_store = MEMORY;
124             PRAGMA mmap_size = 30000000000;
125             PRAGMA page_size = 65536;
126             PRAGMA threads = 4;",
127        )
128        .map_err(|e| {
129            Error::Export(ExportError::DatabaseError {
130                reason: format!("Failed to set PRAGMAs: {e}"),
131            })
132        })?;
133
134        self.conn = Some(conn);
135
136        // 处理 overwrite/append 逻辑
137        if self.overwrite {
138            // 如果 overwrite=true,删除已存在的表
139            let drop_sql = format!("DROP TABLE IF EXISTS {}", self.table_name);
140            if let Some(conn) = &self.conn {
141                conn.execute(&drop_sql, []).map_err(|e| {
142                    Error::Export(ExportError::DatabaseError {
143                        reason: format!("Failed to drop table: {e}"),
144                    })
145                })?;
146                info!("Dropped existing table: {}", self.table_name);
147            }
148        } else if !self.append {
149            // 如果 overwrite=false 且 append=false,清空表数据
150            if let Some(conn) = &self.conn {
151                let delete_sql = format!("DELETE FROM {}", self.table_name);
152                // 尝试清空,如果表不存在则忽略错误
153                let _ = conn.execute(&delete_sql, []);
154                info!("Cleared existing data from table: {}", self.table_name);
155            }
156        }
157
158        // 创建表
159        self.create_table()?;
160
161        // 开启事务
162        if let Some(conn) = &self.conn {
163            conn.execute_batch("BEGIN TRANSACTION;").map_err(|e| {
164                Error::Export(ExportError::DatabaseError {
165                    reason: format!("Failed to begin transaction: {e}"),
166                })
167            })?;
168        }
169
170        info!("SQLite exporter initialized: {}", self.database_url);
171        Ok(())
172    }
173
174    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
175        let conn = self.conn.as_ref().ok_or_else(|| {
176            Error::Export(ExportError::DatabaseError {
177                reason: "Connection not initialized".to_string(),
178            })
179        })?;
180
181        // 使用 prepare_cached 缓存预编译语句
182        // 注意:这里每次都 format 字符串,但由于 table_name 不变,字符串内容不变,prepare_cached 会命中缓存
183        let sql = format!(
184            "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
185            self.table_name
186        );
187
188        let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
189            Error::Export(ExportError::DatabaseError {
190                reason: format!("Failed to prepare statement: {e}"),
191            })
192        })?;
193
194        let meta = sqllog.parse_meta();
195        let indicators = sqllog.parse_indicators();
196
197        let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
198            (
199                Some(ind.execute_time),
200                Some(ind.row_count),
201                Some(ind.execute_id),
202            )
203        } else {
204            (None, None, None)
205        };
206
207        stmt.execute(params![
208            sqllog.ts,
209            meta.ep,
210            meta.sess_id,
211            meta.thrd_id,
212            meta.username,
213            meta.trxid,
214            meta.statement,
215            meta.appname,
216            meta.client_ip,
217            sqllog.body().as_ref(),
218            exec_time,
219            row_count,
220            exec_id
221        ])
222        .map_err(|e| {
223            Error::Export(ExportError::DatabaseError {
224                reason: format!("Failed to insert record: {e}"),
225            })
226        })?;
227
228        self.stats.record_success();
229        Ok(())
230    }
231
232    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
233        if sqllogs.is_empty() {
234            return Ok(());
235        }
236
237        let conn = self.conn.as_ref().ok_or_else(|| {
238            Error::Export(ExportError::DatabaseError {
239                reason: "Connection not initialized".to_string(),
240            })
241        })?;
242
243        let sql = format!(
244            "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
245            self.table_name
246        );
247
248        let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
249            Error::Export(ExportError::DatabaseError {
250                reason: format!("Failed to prepare statement: {e}"),
251            })
252        })?;
253
254        // 内存优化:流式处理避免峰值
255        // 分块处理(每 500 条),避免存储大量中间记录
256        const CHUNK_SIZE: usize = 500;
257        for chunk in sqllogs.chunks(CHUNK_SIZE) {
258            let records: Vec<_> = chunk
259                .par_iter()
260                .map(|sqllog| {
261                    let meta = sqllog.parse_meta();
262                    let indicators = sqllog.parse_indicators();
263                    let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
264                        (
265                            Some(ind.execute_time),
266                            Some(ind.row_count),
267                            Some(ind.execute_id),
268                        )
269                    } else {
270                        (None, None, None)
271                    };
272                    (
273                        sqllog.ts.to_string(),
274                        meta.ep,
275                        meta.sess_id.to_string(),
276                        meta.thrd_id.to_string(),
277                        meta.username.to_string(),
278                        meta.trxid.to_string(),
279                        meta.statement.to_string(),
280                        meta.appname.to_string(),
281                        meta.client_ip.to_string(),
282                        sqllog.body().to_string(),
283                        exec_time,
284                        row_count,
285                        exec_id,
286                    )
287                })
288                .collect();
289
290            for (
291                ts,
292                ep,
293                sess_id,
294                thrd_id,
295                username,
296                trxid,
297                statement,
298                appname,
299                client_ip,
300                sql_body,
301                exec_time,
302                row_count,
303                exec_id,
304            ) in records
305            {
306                stmt.execute(params![
307                    ts, ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip,
308                    sql_body, exec_time, row_count, exec_id
309                ])
310                .map_err(|e| {
311                    Error::Export(ExportError::DatabaseError {
312                        reason: format!("Failed to insert record: {e}"),
313                    })
314                })?;
315
316                self.stats.record_success();
317            }
318        }
319
320        Ok(())
321    }
322
323    fn finalize(&mut self) -> Result<()> {
324        // 提交事务
325        if let Some(conn) = &self.conn {
326            conn.execute_batch("COMMIT;").map_err(|e| {
327                Error::Export(ExportError::DatabaseError {
328                    reason: format!("Failed to commit transaction: {e}"),
329                })
330            })?;
331        }
332
333        info!(
334            "SQLite export finished: {} (success: {}, failed: {})",
335            self.database_url, self.stats.exported, self.stats.failed
336        );
337
338        Ok(())
339    }
340
341    fn name(&self) -> &'static str {
342        "SQLite"
343    }
344
345    fn stats_snapshot(&self) -> Option<ExportStats> {
346        Some(self.stats.clone())
347    }
348}
349
350impl Drop for SqliteExporter {
351    fn drop(&mut self) {
352        // 如果连接存在且未显式 finalize (可能 panic 或提前退出),尝试回滚或提交?
353        // 这里不做复杂处理,依赖 OS 回收文件锁
354        // 如果事务未提交,SQLite 会自动回滚
355    }
356}