dm_database_sqllog2db/exporter/
sqlite.rs1use super::util::strip_ip_prefix;
2use super::{ExportStats, Exporter};
3use crate::error::{Error, ExportError, Result};
4use dm_database_parser_sqllog::Sqllog;
5use log::info;
6use rusqlite::{Connection, params};
7use std::path::Path;
8
9pub struct SqliteExporter {
10 database_url: String,
11 table_name: String,
12 overwrite: bool,
13 append: bool,
14 conn: Option<Connection>,
15 stats: ExportStats,
16}
17
18impl std::fmt::Debug for SqliteExporter {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 f.debug_struct("SqliteExporter")
21 .field("database_url", &self.database_url)
22 .field("table_name", &self.table_name)
23 .field("stats", &self.stats)
24 .finish_non_exhaustive()
25 }
26}
27
28impl SqliteExporter {
29 #[must_use]
30 pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
31 Self {
32 database_url,
33 table_name,
34 overwrite,
35 append,
36 conn: None,
37 stats: ExportStats::new(),
38 }
39 }
40
41 #[must_use]
42 pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
43 Self::new(
44 config.database_url.clone(),
45 config.table_name.clone(),
46 config.overwrite,
47 config.append,
48 )
49 }
50
51 fn db_err(reason: impl Into<String>) -> Error {
52 Error::Export(ExportError::DatabaseError {
53 reason: reason.into(),
54 })
55 }
56}
57
58impl Exporter for SqliteExporter {
59 fn initialize(&mut self) -> Result<()> {
60 info!("Initializing SQLite exporter: {}", self.database_url);
61
62 let path = Path::new(&self.database_url);
63 if let Some(parent) = path.parent().filter(|p| !p.exists()) {
64 std::fs::create_dir_all(parent)
65 .map_err(|e| Self::db_err(format!("create dir failed: {e}")))?;
66 }
67
68 let conn = Connection::open(&self.database_url)
69 .map_err(|e| Self::db_err(format!("open failed: {e}")))?;
70
71 conn.execute_batch(
72 "PRAGMA journal_mode = OFF;
73 PRAGMA synchronous = OFF;
74 PRAGMA cache_size = 1000000;
75 PRAGMA locking_mode = EXCLUSIVE;
76 PRAGMA temp_store = MEMORY;
77 PRAGMA mmap_size = 30000000000;
78 PRAGMA page_size = 65536;
79 PRAGMA threads = 4;",
80 )
81 .map_err(|e| Self::db_err(format!("set PRAGMAs failed: {e}")))?;
82
83 self.conn = Some(conn);
84
85 if self.overwrite {
86 let conn = self.conn.as_ref().unwrap();
87 conn.execute(&format!("DROP TABLE IF EXISTS {}", self.table_name), [])
88 .map_err(|e| Self::db_err(format!("drop table failed: {e}")))?;
89 info!("Dropped existing table: {}", self.table_name);
90 } else if !self.append {
91 let conn = self.conn.as_ref().unwrap();
92 let _ = conn.execute(&format!("DELETE FROM {}", self.table_name), []);
93 }
94
95 let conn = self.conn.as_ref().unwrap();
96 conn.execute(
97 &format!(
98 "CREATE TABLE IF NOT EXISTS {} (
99 ts TEXT NOT NULL, ep INTEGER NOT NULL,
100 sess_id TEXT NOT NULL, thrd_id TEXT NOT NULL,
101 username TEXT NOT NULL, trx_id TEXT NOT NULL,
102 statement TEXT, appname TEXT, client_ip TEXT, tag TEXT,
103 sql TEXT NOT NULL,
104 exec_time_ms REAL, row_count INTEGER, exec_id INTEGER
105 )",
106 self.table_name
107 ),
108 [],
109 )
110 .map_err(|e| Self::db_err(format!("create table failed: {e}")))?;
111
112 conn.execute_batch("BEGIN TRANSACTION;")
113 .map_err(|e| Self::db_err(format!("begin transaction failed: {e}")))?;
114
115 info!("SQLite exporter initialized: {}", self.database_url);
116 Ok(())
117 }
118
119 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
120 let conn = self
121 .conn
122 .as_ref()
123 .ok_or_else(|| Self::db_err("not initialized"))?;
124 let sql = format!(
125 "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
126 self.table_name
127 );
128 let mut stmt = conn
129 .prepare_cached(&sql)
130 .map_err(|e| Self::db_err(format!("prepare failed: {e}")))?;
131
132 let meta = sqllog.parse_meta();
133 let pm = sqllog.parse_performance_metrics();
134 let ind = sqllog.parse_indicators();
135 let (exec_time, row_count, exec_id) = ind.map_or((None, None, None), |i| {
136 (Some(i.exectime), Some(i.rowcount), Some(i.exec_id))
137 });
138
139 stmt.execute(params![
140 sqllog.ts.as_ref(),
141 meta.ep,
142 meta.sess_id.as_ref(),
143 meta.thrd_id.as_ref(),
144 meta.username.as_ref(),
145 meta.trxid.as_ref(),
146 meta.statement.as_ref(),
147 meta.appname.as_ref(),
148 strip_ip_prefix(meta.client_ip.as_ref()),
149 sqllog.tag.as_deref(),
150 pm.sql.as_ref(),
151 exec_time,
152 row_count,
153 exec_id
154 ])
155 .map_err(|e| Self::db_err(format!("insert failed: {e}")))?;
156
157 self.stats.record_success();
158 Ok(())
159 }
160
161 fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
162 if sqllogs.is_empty() {
163 return Ok(());
164 }
165 let conn = self
166 .conn
167 .as_ref()
168 .ok_or_else(|| Self::db_err("not initialized"))?;
169 let sql = format!(
170 "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
171 self.table_name
172 );
173 let mut stmt = conn
174 .prepare_cached(&sql)
175 .map_err(|e| Self::db_err(format!("prepare failed: {e}")))?;
176
177 for sqllog in sqllogs {
178 let meta = sqllog.parse_meta();
179 let pm = sqllog.parse_performance_metrics();
180 let ind = sqllog.parse_indicators();
181 let (exec_time, row_count, exec_id) = ind.map_or((None, None, None), |i| {
182 (Some(i.exectime), Some(i.rowcount), Some(i.exec_id))
183 });
184
185 stmt.execute(params![
186 sqllog.ts.as_ref(),
187 meta.ep,
188 meta.sess_id.as_ref(),
189 meta.thrd_id.as_ref(),
190 meta.username.as_ref(),
191 meta.trxid.as_ref(),
192 meta.statement.as_ref(),
193 meta.appname.as_ref(),
194 strip_ip_prefix(meta.client_ip.as_ref()),
195 sqllog.tag.as_deref(),
196 pm.sql.as_ref(),
197 exec_time,
198 row_count,
199 exec_id
200 ])
201 .map_err(|e| Self::db_err(format!("insert failed: {e}")))?;
202 }
203
204 self.stats.record_success_batch(sqllogs.len());
205 Ok(())
206 }
207
208 fn finalize(&mut self) -> Result<()> {
209 if let Some(conn) = &self.conn {
210 conn.execute_batch("COMMIT;")
211 .map_err(|e| Self::db_err(format!("commit failed: {e}")))?;
212 }
213 info!(
214 "SQLite export finished: {} (success: {}, failed: {})",
215 self.database_url, self.stats.exported, self.stats.failed
216 );
217 Ok(())
218 }
219
220 fn name(&self) -> &'static str {
221 "SQLite"
222 }
223
224 fn stats_snapshot(&self) -> Option<ExportStats> {
225 Some(self.stats.clone())
226 }
227}