Skip to main content

dm_database_sqllog2db/exporter/
sqlite.rs

1use super::util::strip_ip_prefix;
2use super::{ExportStats, Exporter};
3use crate::error::{Error, ExportError, Result};
4use dm_database_parser_sqllog::Sqllog;
5use log::info;
6use rusqlite::{Connection, params};
7use std::path::Path;
8
9pub struct SqliteExporter {
10    database_url: String,
11    table_name: String,
12    overwrite: bool,
13    append: bool,
14    conn: Option<Connection>,
15    stats: ExportStats,
16}
17
18impl std::fmt::Debug for SqliteExporter {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        f.debug_struct("SqliteExporter")
21            .field("database_url", &self.database_url)
22            .field("table_name", &self.table_name)
23            .field("stats", &self.stats)
24            .finish_non_exhaustive()
25    }
26}
27
28impl SqliteExporter {
29    #[must_use]
30    pub fn new(database_url: String, table_name: String, overwrite: bool, append: bool) -> Self {
31        Self {
32            database_url,
33            table_name,
34            overwrite,
35            append,
36            conn: None,
37            stats: ExportStats::new(),
38        }
39    }
40
41    #[must_use]
42    pub fn from_config(config: &crate::config::SqliteExporter) -> Self {
43        Self::new(
44            config.database_url.clone(),
45            config.table_name.clone(),
46            config.overwrite,
47            config.append,
48        )
49    }
50
51    fn db_err(reason: impl Into<String>) -> Error {
52        Error::Export(ExportError::DatabaseError {
53            reason: reason.into(),
54        })
55    }
56}
57
58impl Exporter for SqliteExporter {
59    fn initialize(&mut self) -> Result<()> {
60        info!("Initializing SQLite exporter: {}", self.database_url);
61
62        let path = Path::new(&self.database_url);
63        if let Some(parent) = path.parent().filter(|p| !p.exists()) {
64            std::fs::create_dir_all(parent)
65                .map_err(|e| Self::db_err(format!("create dir failed: {e}")))?;
66        }
67
68        let conn = Connection::open(&self.database_url)
69            .map_err(|e| Self::db_err(format!("open failed: {e}")))?;
70
71        conn.execute_batch(
72            "PRAGMA journal_mode = OFF;
73             PRAGMA synchronous = OFF;
74             PRAGMA cache_size = 1000000;
75             PRAGMA locking_mode = EXCLUSIVE;
76             PRAGMA temp_store = MEMORY;
77             PRAGMA mmap_size = 30000000000;
78             PRAGMA page_size = 65536;
79             PRAGMA threads = 4;",
80        )
81        .map_err(|e| Self::db_err(format!("set PRAGMAs failed: {e}")))?;
82
83        self.conn = Some(conn);
84
85        if self.overwrite {
86            let conn = self.conn.as_ref().unwrap();
87            conn.execute(&format!("DROP TABLE IF EXISTS {}", self.table_name), [])
88                .map_err(|e| Self::db_err(format!("drop table failed: {e}")))?;
89            info!("Dropped existing table: {}", self.table_name);
90        } else if !self.append {
91            let conn = self.conn.as_ref().unwrap();
92            let _ = conn.execute(&format!("DELETE FROM {}", self.table_name), []);
93        }
94
95        let conn = self.conn.as_ref().unwrap();
96        conn.execute(
97            &format!(
98                "CREATE TABLE IF NOT EXISTS {} (
99                    ts TEXT NOT NULL, ep INTEGER NOT NULL,
100                    sess_id TEXT NOT NULL, thrd_id TEXT NOT NULL,
101                    username TEXT NOT NULL, trx_id TEXT NOT NULL,
102                    statement TEXT, appname TEXT, client_ip TEXT, tag TEXT,
103                    sql TEXT NOT NULL,
104                    exec_time_ms REAL, row_count INTEGER, exec_id INTEGER
105                )",
106                self.table_name
107            ),
108            [],
109        )
110        .map_err(|e| Self::db_err(format!("create table failed: {e}")))?;
111
112        conn.execute_batch("BEGIN TRANSACTION;")
113            .map_err(|e| Self::db_err(format!("begin transaction failed: {e}")))?;
114
115        info!("SQLite exporter initialized: {}", self.database_url);
116        Ok(())
117    }
118
119    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
120        let conn = self
121            .conn
122            .as_ref()
123            .ok_or_else(|| Self::db_err("not initialized"))?;
124        let sql = format!(
125            "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
126            self.table_name
127        );
128        let mut stmt = conn
129            .prepare_cached(&sql)
130            .map_err(|e| Self::db_err(format!("prepare failed: {e}")))?;
131
132        let meta = sqllog.parse_meta();
133        let pm = sqllog.parse_performance_metrics();
134        let ind = sqllog.parse_indicators();
135        let (exec_time, row_count, exec_id) = ind.map_or((None, None, None), |i| {
136            (Some(i.exectime), Some(i.rowcount), Some(i.exec_id))
137        });
138
139        stmt.execute(params![
140            sqllog.ts.as_ref(),
141            meta.ep,
142            meta.sess_id.as_ref(),
143            meta.thrd_id.as_ref(),
144            meta.username.as_ref(),
145            meta.trxid.as_ref(),
146            meta.statement.as_ref(),
147            meta.appname.as_ref(),
148            strip_ip_prefix(meta.client_ip.as_ref()),
149            sqllog.tag.as_deref(),
150            pm.sql.as_ref(),
151            exec_time,
152            row_count,
153            exec_id
154        ])
155        .map_err(|e| Self::db_err(format!("insert failed: {e}")))?;
156
157        self.stats.record_success();
158        Ok(())
159    }
160
161    fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
162        if sqllogs.is_empty() {
163            return Ok(());
164        }
165        let conn = self
166            .conn
167            .as_ref()
168            .ok_or_else(|| Self::db_err("not initialized"))?;
169        let sql = format!(
170            "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
171            self.table_name
172        );
173        let mut stmt = conn
174            .prepare_cached(&sql)
175            .map_err(|e| Self::db_err(format!("prepare failed: {e}")))?;
176
177        for sqllog in sqllogs {
178            let meta = sqllog.parse_meta();
179            let pm = sqllog.parse_performance_metrics();
180            let ind = sqllog.parse_indicators();
181            let (exec_time, row_count, exec_id) = ind.map_or((None, None, None), |i| {
182                (Some(i.exectime), Some(i.rowcount), Some(i.exec_id))
183            });
184
185            stmt.execute(params![
186                sqllog.ts.as_ref(),
187                meta.ep,
188                meta.sess_id.as_ref(),
189                meta.thrd_id.as_ref(),
190                meta.username.as_ref(),
191                meta.trxid.as_ref(),
192                meta.statement.as_ref(),
193                meta.appname.as_ref(),
194                strip_ip_prefix(meta.client_ip.as_ref()),
195                sqllog.tag.as_deref(),
196                pm.sql.as_ref(),
197                exec_time,
198                row_count,
199                exec_id
200            ])
201            .map_err(|e| Self::db_err(format!("insert failed: {e}")))?;
202        }
203
204        self.stats.record_success_batch(sqllogs.len());
205        Ok(())
206    }
207
208    fn finalize(&mut self) -> Result<()> {
209        if let Some(conn) = &self.conn {
210            conn.execute_batch("COMMIT;")
211                .map_err(|e| Self::db_err(format!("commit failed: {e}")))?;
212        }
213        info!(
214            "SQLite export finished: {} (success: {}, failed: {})",
215            self.database_url, self.stats.exported, self.stats.failed
216        );
217        Ok(())
218    }
219
220    fn name(&self) -> &'static str {
221        "SQLite"
222    }
223
224    fn stats_snapshot(&self) -> Option<ExportStats> {
225        Some(self.stats.clone())
226    }
227}