1use super::{ExportStats, Exporter};
2use crate::error::{Error, ExportError, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::info;
5use rayon::prelude::*;
6use rusqlite::{Connection, params};
7use std::path::Path;
8
9pub struct SqliteExporter {
11 database_url: String,
12 table_name: String,
13 overwrite: bool,
14 append: bool,
15 conn: Option<Connection>,
16 stats: ExportStats,
17}
18
19impl std::fmt::Debug for SqliteExporter {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 f.debug_struct("SqliteExporter")
22 .field("database_url", &self.database_url)
23 .field("table_name", &self.table_name)
24 .field("overwrite", &self.overwrite)
25 .field("append", &self.append)
26 .field("stats", &self.stats)
27 .finish_non_exhaustive()
28 }
29}
30
31impl SqliteExporter {
32 #[must_use]
34 pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
35 Self {
36 database_url,
37 table_name,
38 overwrite,
39 append,
40 conn: None,
41 stats: ExportStats::new(),
42 }
43 }
44
45 #[must_use]
47 pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
48 Self::new(
49 config.database_url.clone(),
50 config.table_name.clone(),
51 config.overwrite,
52 config.append,
53 )
54 }
55
56 fn create_table(&self) -> Result<()> {
58 let conn = self.conn.as_ref().ok_or_else(|| {
59 Error::Export(ExportError::DatabaseError {
60 reason: "Connection not initialized".to_string(),
61 })
62 })?;
63
64 let sql = format!(
65 r"
66 CREATE TABLE IF NOT EXISTS {} (
67 ts TEXT NOT NULL,
68 ep INTEGER NOT NULL,
69 sess_id TEXT NOT NULL,
70 thrd_id TEXT NOT NULL,
71 username TEXT NOT NULL,
72 trx_id TEXT NOT NULL,
73 statement TEXT,
74 appname TEXT,
75 client_ip TEXT,
76 tag TEXT,
77 sql TEXT NOT NULL,
78 exec_time_ms REAL,
79 row_count INTEGER,
80 exec_id INTEGER
81 )
82 ",
83 self.table_name
84 );
85
86 conn.execute(&sql, []).map_err(|e| {
87 Error::Export(ExportError::DatabaseError {
88 reason: format!("Failed to create table: {e}"),
89 })
90 })?;
91
92 info!("SQLite table created or already exists");
93 Ok(())
94 }
95}
96
97impl Exporter for SqliteExporter {
98 fn initialize(&mut self) -> Result<()> {
99 info!("Initializing SQLite exporter: {}", self.database_url);
100
101 let path = Path::new(&self.database_url);
103 if let Some(parent) = path.parent().filter(|p| !p.exists()) {
104 std::fs::create_dir_all(parent).map_err(|e| {
105 Error::Export(ExportError::DatabaseError {
106 reason: format!("Failed to create directory: {e}"),
107 })
108 })?;
109 }
110
111 let conn = Connection::open(&self.database_url).map_err(|e| {
113 Error::Export(ExportError::DatabaseError {
114 reason: format!("Failed to open database: {e}"),
115 })
116 })?;
117
118 conn.execute_batch(
120 "PRAGMA journal_mode = OFF;
121 PRAGMA synchronous = OFF;
122 PRAGMA cache_size = 1000000;
123 PRAGMA locking_mode = EXCLUSIVE;
124 PRAGMA temp_store = MEMORY;
125 PRAGMA mmap_size = 30000000000;
126 PRAGMA page_size = 65536;
127 PRAGMA threads = 4;",
128 )
129 .map_err(|e| {
130 Error::Export(ExportError::DatabaseError {
131 reason: format!("Failed to set PRAGMAs: {e}"),
132 })
133 })?;
134
135 self.conn = Some(conn);
136
137 if self.overwrite {
139 let drop_sql = format!("DROP TABLE IF EXISTS {}", self.table_name);
141 if let Some(conn) = &self.conn {
142 conn.execute(&drop_sql, []).map_err(|e| {
143 Error::Export(ExportError::DatabaseError {
144 reason: format!("Failed to drop table: {e}"),
145 })
146 })?;
147 info!("Dropped existing table: {}", self.table_name);
148 }
149 } else if !self.append {
150 if let Some(conn) = &self.conn {
152 let delete_sql = format!("DELETE FROM {}", self.table_name);
153 let _ = conn.execute(&delete_sql, []);
155 info!("Cleared existing data from table: {}", self.table_name);
156 }
157 }
158
159 self.create_table()?;
161
162 if let Some(conn) = &self.conn {
164 conn.execute_batch("BEGIN TRANSACTION;").map_err(|e| {
165 Error::Export(ExportError::DatabaseError {
166 reason: format!("Failed to begin transaction: {e}"),
167 })
168 })?;
169 }
170
171 info!("SQLite exporter initialized: {}", self.database_url);
172 Ok(())
173 }
174
175 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
176 let conn = self.conn.as_ref().ok_or_else(|| {
177 Error::Export(ExportError::DatabaseError {
178 reason: "Connection not initialized".to_string(),
179 })
180 })?;
181
182 let sql = format!(
185 "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
186 self.table_name
187 );
188
189 let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
190 Error::Export(ExportError::DatabaseError {
191 reason: format!("Failed to prepare statement: {e}"),
192 })
193 })?;
194
195 let meta = sqllog.parse_meta();
196 let indicators = sqllog.parse_indicators();
197
198 let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
199 (
200 Some(ind.execute_time),
201 Some(ind.row_count),
202 Some(ind.execute_id),
203 )
204 } else {
205 (None, None, None)
206 };
207
208 stmt.execute(params![
209 sqllog.ts,
210 meta.ep,
211 meta.sess_id,
212 meta.thrd_id,
213 meta.username,
214 meta.trxid,
215 meta.statement,
216 meta.appname,
217 meta.client_ip,
218 sqllog.tag,
219 sqllog.body().as_ref(),
220 exec_time,
221 row_count,
222 exec_id
223 ])
224 .map_err(|e| {
225 Error::Export(ExportError::DatabaseError {
226 reason: format!("Failed to insert record: {e}"),
227 })
228 })?;
229
230 self.stats.record_success();
231 Ok(())
232 }
233
234 fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
235 if sqllogs.is_empty() {
236 return Ok(());
237 }
238
239 let conn = self.conn.as_ref().ok_or_else(|| {
240 Error::Export(ExportError::DatabaseError {
241 reason: "Connection not initialized".to_string(),
242 })
243 })?;
244
245 let sql = format!(
246 "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
247 self.table_name
248 );
249
250 let mut stmt = conn.prepare_cached(&sql).map_err(|e| {
251 Error::Export(ExportError::DatabaseError {
252 reason: format!("Failed to prepare statement: {e}"),
253 })
254 })?;
255
256 const CHUNK_SIZE: usize = 500;
259 for chunk in sqllogs.chunks(CHUNK_SIZE) {
260 let records: Vec<_> = chunk
261 .par_iter()
262 .map(|sqllog| {
263 let meta = sqllog.parse_meta();
264 let indicators = sqllog.parse_indicators();
265 let (exec_time, row_count, exec_id) = if let Some(ind) = indicators {
266 (
267 Some(ind.execute_time),
268 Some(ind.row_count),
269 Some(ind.execute_id),
270 )
271 } else {
272 (None, None, None)
273 };
274 (
275 sqllog.ts.to_string(),
276 meta.ep,
277 meta.sess_id.to_string(),
278 meta.thrd_id.to_string(),
279 meta.username.to_string(),
280 meta.trxid.to_string(),
281 meta.statement.to_string(),
282 meta.appname.to_string(),
283 meta.client_ip.to_string(),
284 sqllog.tag.as_ref().map(ToString::to_string),
285 sqllog.body().to_string(),
286 exec_time,
287 row_count,
288 exec_id,
289 )
290 })
291 .collect();
292
293 for (
294 ts,
295 ep,
296 sess_id,
297 thrd_id,
298 username,
299 trxid,
300 statement,
301 appname,
302 client_ip,
303 tag,
304 sql_body,
305 exec_time,
306 row_count,
307 exec_id,
308 ) in records
309 {
310 stmt.execute(params![
311 ts, ep, sess_id, thrd_id, username, trxid, statement, appname, client_ip, tag,
312 sql_body, exec_time, row_count, exec_id
313 ])
314 .map_err(|e| {
315 Error::Export(ExportError::DatabaseError {
316 reason: format!("Failed to insert record: {e}"),
317 })
318 })?;
319
320 self.stats.record_success();
321 }
322 }
323
324 Ok(())
325 }
326
327 fn finalize(&mut self) -> Result<()> {
328 if let Some(conn) = &self.conn {
330 conn.execute_batch("COMMIT;").map_err(|e| {
331 Error::Export(ExportError::DatabaseError {
332 reason: format!("Failed to commit transaction: {e}"),
333 })
334 })?;
335 }
336
337 info!(
338 "SQLite export finished: {} (success: {}, failed: {})",
339 self.database_url, self.stats.exported, self.stats.failed
340 );
341
342 Ok(())
343 }
344
345 fn name(&self) -> &'static str {
346 "SQLite"
347 }
348
349 fn stats_snapshot(&self) -> Option<ExportStats> {
350 Some(self.stats.clone())
351 }
352}
353
354impl Drop for SqliteExporter {
355 fn drop(&mut self) {
356 }
360}