dm_database_sqllog2db/exporter/
sqlite.rs1use super::{ExportStats, Exporter};
2use crate::error::{Error, ExportError, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::info;
5use rayon::prelude::*;
6use rusqlite::{Connection, params};
7use std::path::Path;
8
9pub struct SqliteExporter {
11 database_url: String,
12 table_name: String,
13 overwrite: bool,
14 append: bool,
15 conn: Option<Connection>,
16 stats: ExportStats,
17}
18
19impl std::fmt::Debug for SqliteExporter {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 f.debug_struct("SqliteExporter")
22 .field("database_url", &self.database_url)
23 .field("table_name", &self.table_name)
24 .field("overwrite", &self.overwrite)
25 .field("append", &self.append)
26 .field("stats", &self.stats)
27 .finish_non_exhaustive()
28 }
29}
30
31impl SqliteExporter {
32 #[must_use]
34 pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
35 Self {
36 database_url,
37 table_name,
38 overwrite,
39 append,
40 conn: None,
41 stats: ExportStats::new(),
42 }
43 }
44
45 #[must_use]
47 pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
48 Self::new(
49 config.database_url.clone(),
50 config.table_name.clone(),
51 config.overwrite,
52 config.append,
53 )
54 }
55
56 fn create_table(&self) -> Result<()> {
58 let conn = self.conn.as_ref().ok_or_else(|| {
59 Error::Export(ExportError::DatabaseError {
60 reason: "Connection not initialized".to_string(),
61 })
62 })?;
63
64 let sql = format!(
65 r"
66 CREATE TABLE IF NOT EXISTS {} (
67 ts TEXT NOT NULL,
68 ep INTEGER NOT NULL,
69 sess_id TEXT NOT NULL,
70 thrd_id TEXT NOT NULL,
71 username TEXT NOT NULL,
72 trx_id TEXT NOT NULL,
73 statement TEXT NOT NULL,
74 appname TEXT,
75 client_ip TEXT,
76 sql TEXT NOT NULL,
77 exec_time_ms REAL,
78 row_count INTEGER,
79 exec_id INTEGER
80 )
81 ",
82 self.table_name
83 );
84
85 conn.execute(&sql, []).map_err(|e| {
86 Error::Export(ExportError::DatabaseError {
87 reason: format!("Failed to create table: {e}"),
88 })
89 })?;
90
91 info!("SQLite table created or already exists");
92 Ok(())
93 }
94}
95
96impl Exporter for SqliteExporter {
97 fn initialize(&mut self) -> Result<()> {
98 info!("Initializing SQLite exporter: {}", self.database_url);
99
100 let path = Path::new(&self.database_url);
102 if let Some(parent) = path.parent().filter(|p| !p.exists()) {
103 std::fs::create_dir_all(parent).map_err(|e| {
104 Error::Export(ExportError::DatabaseError {
105 reason: format!("Failed to create directory: {e}"),
106 })
107 })?;
108 }
109
110 let conn = Connection::open(&self.database_url).map_err(|e| {
112 Error::Export(ExportError::DatabaseError {
113 reason: format!("Failed to open database: {e}"),
114 })
115 })?;
116
117 conn.execute_batch(
119 "PRAGMA journal_mode = OFF;
120 PRAGMA synchronous = OFF;
121 PRAGMA cache_size = 1000000;
122 PRAGMA locking_mode = EXCLUSIVE;
123 PRAGMA temp_store = MEMORY;
124 PRAGMA mmap_size = 30000000000;
125 PRAGMA page_size = 65536;
126 PRAGMA threads = 4;",
127 )
128 .map_err(|e| {
129 Error::Export(ExportError::DatabaseError {
130 reason: format!("Failed to set PRAGMAs: {e}"),
131 })
132 })?;
133
134 self.conn = Some(conn);
135
136 if self.overwrite {
138 let drop_sql = format!("DROP TABLE IF EXISTS {}", self.table_name);
140 if let Some(conn) = &self.conn {
141 conn.execute(&drop_sql, []).map_err(|e| {
142 Error::Export(ExportError::DatabaseError {
143 reason: format!("Failed to drop table: {e}"),
144 })
145 })?;
146 info!("Dropped existing table: {}", self.table_name);
147 }
148 } else if !self.append {
149 if let Some(conn) = &self.conn {
151 let delete_sql = format!("DELETE FROM {}", self.table_name);
152 let _ = conn.execute(&delete_sql, []);
154 info!("Cleared existing data from table: {}", self.table_name);
155 }
156 }
157
158 self.create_table()?;
160
161 if let Some(conn) = &self.conn {
163 conn.execute_batch("BEGIN TRANSACTION;").map_err(|e| {
164 Error::Export(ExportError::DatabaseError {
165 reason: format!("Failed to begin transaction: {e}"),
166 })
167 })?;
168 }
169
170 info!("SQLite exporter initialized: {}", self.database_url);
171 Ok(())
172 }
173
174 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
175 let conn = self.conn.as_ref().ok_or_else(|| {
176 Error::Export(ExportError::DatabaseError {
177 reason: "Connection not initialized".to_string(),
178 })
179 })?;
180
181 let sql = format!(
184 "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
185 self.table_name
186 );
187
188 let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
189 Error::Export(ExportError::DatabaseError {
190 reason: format!("Failed to prepare statement: {e}"),
191 })
192 })?;
193
194 let meta = sqllog.parse_meta();
195 let indicators = sqllog.parse_indicators();
196
197 let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
198 (
199 Some(ind.execute_time),
200 Some(ind.row_count),
201 Some(ind.execute_id),
202 )
203 } else {
204 (None, None, None)
205 };
206
207 stmt.execute(params![
208 sqllog.ts,
209 meta.ep,
210 meta.sess_id,
211 meta.thrd_id,
212 meta.username,
213 meta.trxid,
214 meta.statement,
215 meta.appname,
216 meta.client_ip,
217 sqllog.body().as_ref(),
218 exec_time,
219 row_count,
220 exec_id
221 ])
222 .map_err(|e| {
223 Error::Export(ExportError::DatabaseError {
224 reason: format!("Failed to insert record: {e}"),
225 })
226 })?;
227
228 self.stats.record_success();
229 Ok(())
230 }
231
232 fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
233 if sqllogs.is_empty() {
234 return Ok(());
235 }
236
237 let conn = self.conn.as_ref().ok_or_else(|| {
238 Error::Export(ExportError::DatabaseError {
239 reason: "Connection not initialized".to_string(),
240 })
241 })?;
242
243 let sql = format!(
244 "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
245 self.table_name
246 );
247
248 let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
249 Error::Export(ExportError::DatabaseError {
250 reason: format!("Failed to prepare statement: {e}"),
251 })
252 })?;
253
254 const CHUNK_SIZE: usize = 500;
257 for chunk in sqllogs.chunks(CHUNK_SIZE) {
258 let records: Vec<_> = chunk
259 .par_iter()
260 .map(|sqllog| {
261 let meta = sqllog.parse_meta();
262 let indicators = sqllog.parse_indicators();
263 let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
264 (
265 Some(ind.execute_time),
266 Some(ind.row_count),
267 Some(ind.execute_id),
268 )
269 } else {
270 (None, None, None)
271 };
272 (
273 sqllog.ts.to_string(),
274 meta.ep,
275 meta.sess_id.to_string(),
276 meta.thrd_id.to_string(),
277 meta.username.to_string(),
278 meta.trxid.to_string(),
279 meta.statement.to_string(),
280 meta.appname.to_string(),
281 meta.client_ip.to_string(),
282 sqllog.body().to_string(),
283 exec_time,
284 row_count,
285 exec_id,
286 )
287 })
288 .collect();
289
290 for (
291 ts,
292 ep,
293 sess_id,
294 thrd_id,
295 username,
296 trxid,
297 statement,
298 appname,
299 client_ip,
300 sql_body,
301 exec_time,
302 row_count,
303 exec_id,
304 ) in records
305 {
306 stmt.execute(params![
307 ts, ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip,
308 sql_body, exec_time, row_count, exec_id
309 ])
310 .map_err(|e| {
311 Error::Export(ExportError::DatabaseError {
312 reason: format!("Failed to insert record: {e}"),
313 })
314 })?;
315
316 self.stats.record_success();
317 }
318 }
319
320 Ok(())
321 }
322
323 fn finalize(&mut self) -> Result<()> {
324 if let Some(conn) = &self.conn {
326 conn.execute_batch("COMMIT;").map_err(|e| {
327 Error::Export(ExportError::DatabaseError {
328 reason: format!("Failed to commit transaction: {e}"),
329 })
330 })?;
331 }
332
333 info!(
334 "SQLite export finished: {} (success: {}, failed: {})",
335 self.database_url, self.stats.exported, self.stats.failed
336 );
337
338 Ok(())
339 }
340
341 fn name(&self) -> &'static str {
342 "SQLite"
343 }
344
345 fn stats_snapshot(&self) -> Option<ExportStats> {
346 Some(self.stats.clone())
347 }
348}
349
350impl Drop for SqliteExporter {
351 fn drop(&mut self) {
352 }
356}