use super::*;
use dm_database_parser_sqllog::LogParserBuilder;
fn write_test_log(path: &std::path::Path, count: usize) {
use std::fmt::Write as _;
let mut buf = String::with_capacity(count * 170);
for i in 0..count {
writeln!(
buf,
"2025-01-15 10:30:28.001 (EP[0] sess:0x{i:04x} user:TESTUSER trxid:{i} stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT * FROM t WHERE id={i}. EXECTIME: {exec}(ms) ROWCOUNT: {rows}(rows) EXEC_ID: {i}.",
exec = (i * 13) % 1000,
rows = i % 100,
).unwrap();
}
std::fs::write(path, buf).unwrap();
}
#[test]
fn test_sqlite_basic_export() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("out.db");
write_test_log(&logfile, 5);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"sqllog_records".into(),
true,
false,
);
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_normalized(r, None).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM sqllog_records", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 5);
}
#[test]
fn test_sqlite_overwrite_drops_existing_table() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("out.db");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut e =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), false, false);
e.initialize().unwrap();
for r in &records {
e.export_one_normalized(r, None).unwrap();
}
e.finalize().unwrap();
}
{
let mut e = SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), true, false);
e.initialize().unwrap();
for r in &records {
e.export_one_normalized(r, None).unwrap();
}
e.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 3);
}
#[test]
fn test_sqlite_with_normalized() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("out.db");
write_test_log(&logfile, 2);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let normalized: Vec<Option<String>> = records
.iter()
.map(|_| Some("SELECT * FROM t WHERE id=?".into()))
.collect();
{
let mut exporter =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), true, false);
exporter.normalize = true;
exporter.initialize().unwrap();
for (r, ns) in records.iter().zip(normalized.iter()) {
exporter.export_one_normalized(r, ns.as_deref()).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let ns: Option<String> = conn
.query_row("SELECT normalized_sql FROM tbl LIMIT 1", [], |r| r.get(0))
.unwrap();
assert_eq!(ns, Some("SELECT * FROM t WHERE id=?".to_string()));
}
#[test]
fn test_sqlite_from_config() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("cfg.db");
let cfg = crate::config::SqliteExporterConfig {
database_url: dbfile.to_string_lossy().into_owned(),
table_name: "records".to_string(),
overwrite: true,
append: false,
batch_size: 42_000,
};
let mut exporter = SqliteExporter::from_config(&cfg);
assert_eq!(
exporter.batch_size, cfg.batch_size,
"from_config must map batch_size correctly"
);
exporter.initialize().unwrap();
exporter.finalize().unwrap();
assert!(dbfile.exists());
}
#[test]
fn test_sqlite_export_method() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("export.db");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut exporter =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), true, false);
exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 3);
}
#[test]
fn test_sqlite_export_one_preparsed() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("preparsed.db");
write_test_log(&logfile, 2);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut exporter =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), true, false);
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_preparsed(r, true, None).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 2);
}
#[test]
fn test_sqlite_stats_snapshot() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("stats.db");
write_test_log(&logfile, 4);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), true, false);
exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
let snap = exporter.stats_snapshot().unwrap();
assert_eq!(snap.exported, 4);
exporter.finalize().unwrap();
}
#[test]
fn test_sqlite_debug_format() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("debug.db");
let exporter = SqliteExporter::new(
dbfile.to_string_lossy().into_owned(),
"my_table".to_string(),
true,
false,
);
let s = format!("{exporter:?}");
assert!(
s.contains("SqliteExporter"),
"Debug output should contain struct name"
);
assert!(
s.contains("my_table"),
"Debug output should contain table_name, got: {s}"
);
assert!(
s.contains("database_url"),
"Debug output should contain database_url field, got: {s}"
);
}
#[test]
fn test_sqlite_field_order() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:testuser trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT 42. EXECTIME: 5(ms) ROWCOUNT: 2(rows) EXEC_ID: 99.\n",
)
.unwrap();
let db = dir.path().join("out.db");
{
let mut exporter = SqliteExporter::new(
db.to_str().unwrap().to_string(),
"records".to_string(),
true,
false,
);
exporter.normalize = false;
exporter.field_mask =
FieldMask::from_names(&["sql".to_string(), "username".to_string()]).unwrap();
exporter.ordered_indices = vec![10, 4]; exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&db).unwrap();
let (sql_val, username_val): (String, String) = conn
.query_row("SELECT sql, username FROM records", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})
.unwrap();
assert!(sql_val.contains("SELECT 42"), "sql_val: {sql_val}");
assert_eq!(username_val, "testuser");
}
#[test]
fn test_sqlite_append_mode() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("append.db");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut e =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), false, false);
e.initialize().unwrap();
for r in &records {
e.export(r).unwrap();
}
e.finalize().unwrap();
}
{
let mut e = SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), false, true);
e.initialize().unwrap();
for r in &records {
e.export(r).unwrap();
}
e.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 6);
}
#[test]
fn test_sqlite_initialize_creates_quoted_table() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("quoted.db");
{
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"my_records".to_string(),
true,
false,
);
exporter.initialize().unwrap();
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let create_stmt: String = conn
.query_row(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='my_records'",
[],
|r| r.get(0),
)
.unwrap();
assert!(
create_stmt.contains("\"my_records\""),
"table name should be double-quoted; actual: {create_stmt}"
);
}
#[test]
fn test_sqlite_initialize_silent_when_table_missing() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("missing_tbl.db");
{
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"fresh_tbl".to_string(),
false,
false,
);
exporter
.initialize()
.expect("initialize should silently succeed when table missing");
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='fresh_tbl'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1, "table fresh_tbl should be created");
}
#[test]
fn test_sqlite_initialize_clears_existing_table_via_delete() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let dbfile = dir.path().join("clear.db");
write_test_log(&logfile, 4);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut e = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"clr_tbl".into(),
false,
false,
);
e.initialize().unwrap();
for r in &records {
e.export(r).unwrap();
}
e.finalize().unwrap();
}
{
let mut e = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"clr_tbl".into(),
false,
false,
);
e.initialize().unwrap();
for r in &records {
e.export(r).unwrap();
}
e.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM clr_tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(
count, 4,
"DELETE FROM should clear previous rows; got {count}"
);
}
#[test]
fn test_sqlite_batch_commit() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("batch.log");
let dbfile = dir.path().join("batch.db");
write_test_log(&logfile, 5);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut exporter =
SqliteExporter::new(dbfile.to_string_lossy().into(), "tbl".into(), true, false);
exporter.batch_size = 2;
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_normalized(r, None).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(
count, 5,
"5 条记录经过批量提交后必须全部持久化,实际: {count}"
);
}
#[test]
fn test_sqlite_export_without_initialize_returns_err() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("no_init.db");
let logfile = dir.path().join("t.log");
std::fs::write(
&logfile,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"tbl".to_string(),
true,
false,
);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
let result = exporter.export(&record);
assert!(result.is_err(), "未调用 initialize() 时 export 应返回 Err");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("not initialized"),
"错误消息应含 'not initialized',实际: {err_msg}"
);
}
}
#[test]
fn test_sqlite_export_one_normalized_without_initialize_returns_err() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("no_init2.db");
let logfile = dir.path().join("t.log");
std::fs::write(
&logfile,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"tbl".to_string(),
true,
false,
);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
let result = exporter.export_one_normalized(&record, Some("SELECT 1"));
assert!(
result.is_err(),
"未初始化时 export_one_normalized 应返回 Err"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("not initialized"),
"错误消息应含 'not initialized',实际: {err_msg}"
);
}
}
#[test]
fn test_sqlite_initialize_succeeds_and_creates_db() {
let dir = tempfile::TempDir::new().unwrap();
let dbfile = dir.path().join("pragma.db");
{
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"tbl".to_string(),
true,
false,
);
let result = exporter.initialize();
assert!(
result.is_ok(),
"initialize() 应成功(initialize_pragmas 已正确执行),实际: {result:?}"
);
exporter.finalize().unwrap();
}
assert!(dbfile.exists(), "DB 文件应在 initialize 后存在");
assert!(
dbfile.metadata().unwrap().len() > 0,
"DB 文件不应为空(至少含 SQLite header)"
);
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let table_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='tbl'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
table_count, 1,
"initialize 应创建目标表 'tbl',实际: {table_count}"
);
}
#[test]
fn test_sqlite_projection_subset_export() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("t.log");
let dbfile = dir.path().join("proj.db");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut exporter = SqliteExporter::new(
dbfile.to_string_lossy().into(),
"proj_tbl".to_string(),
true,
false,
);
exporter.normalize = false;
exporter.field_mask =
FieldMask::from_names(&["ts".to_string(), "username".to_string(), "sql".to_string()])
.unwrap();
exporter.ordered_indices = vec![0, 4, 10]; exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
exporter.finalize().unwrap();
}
let conn = rusqlite::Connection::open(&dbfile).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM proj_tbl", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 3, "应插入 3 条记录,实际: {count}");
let col_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('proj_tbl')",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(col_count, 3, "投影后表应有 3 列,实际: {col_count}");
}