use super::collector;
use super::orchestrator::handle_run;
use crate::config::Config;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
#[test]
fn test_include_performance_metrics_false_csv_excludes_pm_columns() {
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("t.log");
std::fs::write(
&log_path,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let csv_path = dir.path().join("out.csv");
let error_log = dir.path().join("errors.log");
let app_log = dir.path().join("app.log");
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\ninclude_performance_metrics = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
errlog = error_log.to_string_lossy().replace('\\', "/"),
applog = app_log.to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: Config = toml::from_str(&toml).unwrap();
handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None).unwrap();
let content = std::fs::read_to_string(&csv_path).unwrap();
let header = content.lines().next().unwrap();
assert!(
!header.contains("exec_time_ms"),
"header should skip exec_time_ms: {header}"
);
assert!(
!header.contains("row_count"),
"header should skip row_count: {header}"
);
assert!(
!header.contains("exec_id"),
"header should skip exec_id: {header}"
);
assert!(header.contains("sql"), "sql column should remain: {header}");
}
#[test]
fn test_handle_run_default_config_succeeds() {
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("t.log");
std::fs::write(
&log_path,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let csv_path = dir.path().join("out.csv");
let error_log = dir.path().join("errors.log");
let app_log = dir.path().join("app.log");
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
errlog = error_log.to_string_lossy().replace('\\', "/"),
applog = app_log.to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: Config = toml::from_str(&toml).unwrap();
let result = handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None);
assert!(result.is_ok(), "handle_run 应在默认配置时成功: {result:?}");
}
#[test]
fn test_filter_path() {
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("t.log");
std::fs::write(
&log_path,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let csv_path = dir.path().join("out.csv");
let error_log = dir.path().join("errors.log");
let app_log = dir.path().join("app.log");
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[filter]\nenable = true\nusernames = [\"U\"]\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
errlog = error_log.to_string_lossy().replace('\\', "/"),
applog = app_log.to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: Config = toml::from_str(&toml).unwrap();
handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None).unwrap();
let content = std::fs::read_to_string(&csv_path).unwrap();
assert!(
content.contains("SELECT 1"),
"filtered record should appear in output: {content}"
);
}
#[test]
fn test_parallel_merge_consistent() {
let dir = tempfile::TempDir::new().unwrap();
let log_line = "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT id FROM orders WHERE user_id = 42. EXECTIME: 5(ms) ROWCOUNT: 3(rows) EXEC_ID: 1.\n";
let error_log = dir.path().join("errors.log");
let app_log = dir.path().join("app.log");
let make_cfg_dir = |logdir: &std::path::Path, csv_file: &str| {
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = logdir.to_string_lossy().replace('\\', "/"),
errlog = error_log.to_string_lossy().replace('\\', "/"),
applog = app_log.to_string_lossy().replace('\\', "/"),
csv = csv_file,
);
toml::from_str::<Config>(&toml).unwrap()
};
let seq_dir = dir.path().join("seq");
std::fs::create_dir(&seq_dir).unwrap();
std::fs::write(seq_dir.join("only.log"), log_line).unwrap();
let csv_seq = dir
.path()
.join("out_seq.csv")
.to_string_lossy()
.replace('\\', "/");
let cfg_seq = make_cfg_dir(&seq_dir, &csv_seq);
let result_seq = handle_run(
&cfg_seq,
true,
false,
&Arc::new(AtomicBool::new(false)),
None,
);
assert!(result_seq.is_ok(), "顺序路径应成功: {result_seq:?}");
let par_dir = dir.path().join("par");
std::fs::create_dir(&par_dir).unwrap();
for name in ["a.log", "b.log"] {
std::fs::write(par_dir.join(name), log_line).unwrap();
}
let csv_par = dir
.path()
.join("out_par.csv")
.to_string_lossy()
.replace('\\', "/");
let cfg_par = make_cfg_dir(&par_dir, &csv_par);
let result_par = handle_run(
&cfg_par,
true,
false,
&Arc::new(AtomicBool::new(false)),
Some(2),
);
assert!(result_par.is_ok(), "并行路径应成功: {result_par:?}");
let seq_lines = std::fs::read_to_string(&csv_seq).unwrap().lines().count();
let par_lines = std::fs::read_to_string(&csv_par).unwrap().lines().count();
assert_eq!(
par_lines,
seq_lines + 1,
"并行路径(2 个文件)应比顺序路径(1 个文件)多 1 条数据行"
);
}
#[test]
fn test_sqlite_parallel_matches_sequential() {
let dir = tempfile::TempDir::new().unwrap();
let error_log = dir.path().join("errors.log");
let app_log = dir.path().join("app.log");
let log_a = "\
2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [INS] INSERT INTO t VALUES (1). EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 100.\n\
2025-01-15 10:30:28.010 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x2 appname:A ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'alice')}\n\
2025-01-15 10:30:28.011 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x2 appname:A ip:10.0.0.1) [INS] INSERT INTO t(name) VALUES (?). EXECTIME: 2(ms) ROWCOUNT: 1(rows) EXEC_ID: 101.\n";
let log_b = "\
2025-01-15 10:30:28.001 (EP[0] sess:0x0002 user:U trxid:2 stmt:0x3 appname:A ip:10.0.0.1) [INS] INSERT INTO t VALUES (2). EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 200.\n\
2025-01-15 10:30:28.010 (EP[0] sess:0x0002 user:U trxid:2 stmt:0x4 appname:A ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'bob')}\n\
2025-01-15 10:30:28.011 (EP[0] sess:0x0002 user:U trxid:2 stmt:0x4 appname:A ip:10.0.0.1) [INS] INSERT INTO t(name) VALUES (?). EXECTIME: 2(ms) ROWCOUNT: 1(rows) EXEC_ID: 201.\n";
let log_c = "\
2025-01-15 10:30:28.001 (EP[0] sess:0x0003 user:U trxid:3 stmt:0x5 appname:A ip:10.0.0.1) [INS] INSERT INTO t VALUES (3). EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 300.\n\
2025-01-15 10:30:28.010 (EP[0] sess:0x0003 user:U trxid:3 stmt:0x6 appname:A ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'carol')}\n\
2025-01-15 10:30:28.011 (EP[0] sess:0x0003 user:U trxid:3 stmt:0x6 appname:A ip:10.0.0.1) [INS] INSERT INTO t(name) VALUES (?). EXECTIME: 2(ms) ROWCOUNT: 1(rows) EXEC_ID: 301.\n";
let par_dir = dir.path().join("par");
std::fs::create_dir(&par_dir).unwrap();
std::fs::write(par_dir.join("a.log"), log_a).unwrap();
std::fs::write(par_dir.join("b.log"), log_b).unwrap();
std::fs::write(par_dir.join("c.log"), log_c).unwrap();
let seq_dir = dir.path().join("seq");
std::fs::create_dir(&seq_dir).unwrap();
std::fs::write(seq_dir.join("all.log"), format!("{log_a}{log_b}{log_c}")).unwrap();
let make_cfg = |logdir: &std::path::Path, db_path: &str| {
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.sqlite]\ndatabase_url = \"{db}\"\ntable_name = \"sqllog\"\noverwrite = true\nappend = false\nbatch_size = 1000\n",
logdir = logdir.to_string_lossy().replace('\\', "/"),
errlog = error_log.to_string_lossy().replace('\\', "/"),
applog = app_log.to_string_lossy().replace('\\', "/"),
db = db_path,
);
toml::from_str::<Config>(&toml).unwrap()
};
let seq_db = dir
.path()
.join("seq.db")
.to_string_lossy()
.replace('\\', "/");
let par_db = dir
.path()
.join("par.db")
.to_string_lossy()
.replace('\\', "/");
handle_run(
&make_cfg(&seq_dir, &seq_db),
true,
false,
&Arc::new(AtomicBool::new(false)),
None,
)
.unwrap();
handle_run(
&make_cfg(&par_dir, &par_db),
true,
false,
&Arc::new(AtomicBool::new(false)),
None,
)
.unwrap();
let read_rows = |path: &str| {
let conn = rusqlite::Connection::open(path).unwrap();
let mut stmt = conn
.prepare("SELECT exec_id, sql, normalized_sql FROM sqllog ORDER BY exec_id, sql")
.unwrap();
stmt.query_map([], |row| {
Ok((
row.get::<_, Option<i64>>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})
.unwrap()
.collect::<rusqlite::Result<Vec<_>>>()
.unwrap()
};
let seq_rows = read_rows(&seq_db);
let par_rows = read_rows(&par_db);
assert_eq!(
seq_rows, par_rows,
"并行 SQLite 输出与顺序模式记录集合应一致"
);
assert!(std::fs::metadata(&par_db).unwrap().len() > 0);
}
#[test]
fn test_normalize_and_export_filtered_params_updates_buffer() {
use super::processor::{ExportAction, normalize_and_export};
use crate::error::ErrorStats;
use crate::exporter::CsvExporter;
use crate::exporter::ExporterManager;
use crate::pipeline::normalizer::ParamBuffer;
use dm_database_parser_sqllog::Sqllog;
let dir = tempfile::TempDir::new().unwrap();
let csv_path = dir.path().join("out.csv");
let exporter = CsvExporter::new(&csv_path);
let mut manager = ExporterManager::from_csv(exporter);
manager.initialize().unwrap();
let record = Sqllog {
ts: "2024-01-01 00:00:00.000".to_string(),
tag: None,
ep: 0,
sess_id: "sess_gap1".to_string(),
thrd_id: "t1".to_string(),
username: "usr".to_string(),
trxid: "tx1".to_string(),
statement: "stmt_gap1".to_string(),
appname: "app".to_string(),
client_ip: "127.0.0.1".to_string(),
sql: "PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'hello')}".to_string(),
exectime: 0.0,
rowcount: 0,
exec_id: 0,
};
let mut params_buffer: ParamBuffer = ParamBuffer::new();
let mut ns_scratch: Vec<u8> = Vec::new();
let mut records_in_file: usize = 0;
let mut file_stats = ErrorStats::default();
let action = normalize_and_export(
&record,
&mut manager,
true, true, &mut params_buffer,
None, &mut ns_scratch,
None, &mut records_in_file,
&mut file_stats,
"test_file.log",
false, );
assert!(
matches!(action, ExportAction::Continue),
"passes=false 路径应返回 Continue"
);
assert_eq!(
records_in_file, 0,
"passes=false 时 records_in_file 应保持为 0,实际为 {records_in_file}"
);
let buf_key = ("sess_gap1".to_string(), "stmt_gap1".to_string());
assert!(
params_buffer.contains_key(&buf_key),
"passes=false+do_normalize=true 下 PARAMS 记录应写入 params_buffer,\
但 key ({:?}) 不存在; buffer keys={:?}",
buf_key,
params_buffer.keys().collect::<Vec<_>>()
);
}
#[test]
fn test_normalize_and_export_quota_hit_returns_break_quota() {
use super::processor::{ExportAction, normalize_and_export};
use crate::error::ErrorStats;
use crate::exporter::CsvExporter;
use crate::exporter::ExporterManager;
use crate::pipeline::normalizer::ParamBuffer;
use dm_database_parser_sqllog::Sqllog;
let dir = tempfile::TempDir::new().unwrap();
let csv_path = dir.path().join("out.csv");
let mut manager = ExporterManager::from_csv(CsvExporter::new(&csv_path));
manager.initialize().unwrap();
let record = Sqllog {
ts: "2024-01-01 00:00:00.000".to_string(),
tag: Some("SEL".to_string()),
ep: 0,
sess_id: "sess_gap2".to_string(),
thrd_id: "t2".to_string(),
username: "usr".to_string(),
trxid: "tx2".to_string(),
statement: "stmt_gap2".to_string(),
appname: "app".to_string(),
client_ip: "127.0.0.1".to_string(),
sql: "SELECT 1".to_string(),
exectime: 1.0,
rowcount: 1,
exec_id: 42,
};
let mut params_buffer: ParamBuffer = ParamBuffer::new();
let mut ns_scratch: Vec<u8> = Vec::new();
let mut records_in_file: usize = 0;
let mut file_stats = ErrorStats::default();
let action = normalize_and_export(
&record,
&mut manager,
true, false, &mut params_buffer,
None,
&mut ns_scratch,
Some(0), &mut records_in_file,
&mut file_stats,
"test_file.log",
true, );
assert!(
matches!(action, ExportAction::BreakQuota),
"remaining=Some(0) 且 passes=true 应返回 BreakQuota,\
但得到了 Continue 或 BreakFatal"
);
assert_eq!(
records_in_file, 0,
"BreakQuota 路径下 records_in_file 应保持为 0,实际为 {records_in_file}"
);
}
#[test]
fn test_progress_bar_template() {
let pb = super::input::make_progress_bar(true, 3);
assert!(pb.is_some(), "show_progress=true 应返回 Some(ProgressBar)");
let pb = pb.unwrap();
assert_eq!(
pb.length(),
Some(3),
"length() 应为 Some(3),实际为 {:?}",
pb.length()
);
assert_eq!(
pb.position(),
0,
"初始 position() 应为 0,实际为 {}",
pb.position()
);
pb.set_message("test message");
pb.finish_and_clear();
}
#[test]
fn test_progress_bar_disabled() {
let pb = super::input::make_progress_bar(false, 3);
assert!(
pb.is_none(),
"show_progress=false 应返回 None,实际返回了 Some"
);
}
#[test]
fn test_error_log_written() {
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("t.log");
std::fs::write(
&log_path,
"garbage line that cannot be parsed\n2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let csv_path = dir.path().join("out.csv");
let error_log = dir.path().join("errors.log");
let app_log = dir.path().join("app.log");
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
errlog = error_log.to_string_lossy().replace('\\', "/"),
applog = app_log.to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: Config = toml::from_str(&toml).unwrap();
handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None).unwrap();
assert!(
error_log.exists(),
"error log 文件应在有解析错误时被写出,但未找到: {}",
error_log.display()
);
let content = std::fs::read_to_string(&error_log).unwrap();
assert!(
content.contains("[ERROR] line "),
"error log 应含有 '[ERROR] line ',实际内容: {content}"
);
assert!(
content.contains("reason:"),
"error log 应含有 'reason:',实际内容: {content}"
);
}
#[test]
fn test_hint_output() {
use crate::error::{ErrorKind, ErrorStats};
let stats = ErrorStats {
total_errors: 3,
parse_errors: 3,
by_type: {
let mut m = std::collections::HashMap::new();
m.insert(ErrorKind::EncodingError, 3u64);
m
},
..Default::default()
};
assert_eq!(
stats
.by_type
.get(&ErrorKind::EncodingError)
.copied()
.unwrap_or(0),
3,
"by_type[EncodingError] 应为 3"
);
super::summary::print_run_summary(false, false, false, 0.1, &[], 0, 0, &stats);
}
#[test]
fn test_write_error_log_run_still_truncates() {
use crate::config::ErrorLogConfig;
use crate::error::{ErrorKind, ErrorStats, ParseErrorRecord};
let tmp_file = tempfile::NamedTempFile::new().expect("failed to create tempfile");
let tmp_path = tmp_file.path().to_string_lossy().into_owned();
std::fs::write(&tmp_path, b"OLD CONTENT\n").expect("failed to write old content");
let cfg = Config {
error: Some(ErrorLogConfig {
file: tmp_path.clone(),
}),
append_error_log: false, ..Config::default()
};
let stats = ErrorStats {
parse_errors: 1,
total_errors: 1,
parse_error_records: vec![ParseErrorRecord {
line_number: 1,
raw_truncated: "bad line".to_string(),
kind: ErrorKind::ParseFailed,
}],
..ErrorStats::default()
};
super::error_log::write_error_log(&cfg, &stats);
let content = std::fs::read_to_string(&tmp_path).expect("failed to read error log");
assert!(
!content.contains("OLD CONTENT"),
"append_error_log=false 时旧内容应被截断,实际内容: {content}"
);
assert!(
content.contains("[ERROR] line "),
"error log 应含有新写入的 [ERROR] 行,实际内容: {content}"
);
}
#[test]
fn test_write_error_log_watch_appends() {
use crate::config::ErrorLogConfig;
use crate::error::{ErrorKind, ErrorStats, ParseErrorRecord};
let tmp = tempfile::NamedTempFile::new().expect("failed to create tempfile");
let path = tmp.path().to_string_lossy().into_owned();
std::fs::write(&path, b"EXISTING\n").expect("failed to write existing content");
let cfg = Config {
error: Some(ErrorLogConfig { file: path.clone() }),
append_error_log: true, ..Config::default()
};
let stats = ErrorStats {
parse_errors: 1,
total_errors: 1,
parse_error_records: vec![ParseErrorRecord {
line_number: 1,
raw_truncated: "bad".to_string(),
kind: ErrorKind::ParseFailed,
}],
..ErrorStats::default()
};
super::error_log::write_error_log(&cfg, &stats);
let content = std::fs::read_to_string(&path).expect("failed to read error log");
assert!(
content.contains("EXISTING"),
"append_error_log=true 时旧内容应被保留(追加模式),实际内容: {content}"
);
assert!(
content.contains("[ERROR] line "),
"新错误行应追加到文件末尾,实际内容: {content}"
);
}
#[test]
fn test_run_summary() {
use crate::error::{ErrorKind, ErrorStats};
let stats = ErrorStats {
total_errors: 2,
parse_errors: 2,
filtered_out: 5,
by_type: {
let mut m = std::collections::HashMap::new();
m.insert(ErrorKind::EncodingError, 2u64);
m
},
..Default::default()
};
super::summary::print_run_summary(false, false, false, 1.5, &[], 10, 0, &stats);
}
#[derive(Debug)]
struct AlwaysFail;
impl crate::pipeline::LogProcessor for AlwaysFail {
fn process(&self, _: &dm_database_parser_sqllog::Sqllog) -> bool {
false
}
}
#[test]
fn test_collector_invalid_path_returns_error() {
use crate::pipeline::Pipeline;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let pipeline = Pipeline::default();
let interrupted = Arc::new(AtomicBool::new(false));
let result = collector::collect_log_file(
std::path::Path::new("/nonexistent/absolutely/not/there.log"),
&pipeline,
false,
None,
&interrupted,
);
assert!(result.is_err(), "不存在路径应返回 Err,实际: {result:?}");
assert!(
matches!(
result.unwrap_err(),
crate::error::Error::Parser(crate::error::ParserError::InvalidPath { .. })
),
"应匹配 ParserError::InvalidPath"
);
}
#[test]
fn test_collector_parse_error_accumulation() {
use crate::pipeline::Pipeline;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("bad.log");
std::fs::write(&log_path, "not a valid log line\nalso invalid\n").unwrap();
let pipeline = Pipeline::default();
let interrupted = Arc::new(AtomicBool::new(false));
let (rows, stats) =
collector::collect_log_file(&log_path, &pipeline, false, None, &interrupted)
.expect("collect_log_file 应不返回 Err");
assert!(
rows.is_empty(),
"全部非法行应不产生记录,实际 rows.len()={}",
rows.len()
);
assert!(
stats.parse_errors > 0,
"应记录至少 1 条解析错误,实际 parse_errors={}",
stats.parse_errors
);
}
#[test]
fn test_collector_not_needed_filtering() {
use crate::pipeline::Pipeline;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("dml.log");
let valid_dml = "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:TESTUSER trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT id FROM t. EXECTIME: 5(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n";
std::fs::write(&log_path, valid_dml).unwrap();
let mut pipeline = Pipeline::new();
pipeline.add(Box::new(AlwaysFail));
assert!(!pipeline.is_empty(), "AlwaysFail 添加后 pipeline 应非空");
let interrupted = Arc::new(AtomicBool::new(false));
let (rows, _parse_errors) =
collector::collect_log_file(&log_path, &pipeline, false, None, &interrupted)
.expect("collect_log_file 应 Ok");
assert!(
rows.is_empty(),
"AlwaysFail 过滤所有记录,rows 应为空,实际 {}",
rows.len()
);
}
#[test]
fn test_collector_filtered_params_normalize() {
use crate::pipeline::Pipeline;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("params.log");
let params_line = "2025-01-15 10:30:28.010 (EP[0] sess:0x0001 user:TESTUSER trxid:1 stmt:0x2 appname:App ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'testvalue')}\n";
std::fs::write(&log_path, params_line).unwrap();
let mut pipeline = Pipeline::new();
pipeline.add(Box::new(AlwaysFail));
let interrupted = Arc::new(AtomicBool::new(false));
let (rows, _parse_errors) =
collector::collect_log_file(&log_path, &pipeline, true, None, &interrupted)
.expect("collect_log_file 应 Ok");
assert!(
rows.is_empty(),
"AlwaysFail 过滤 PARAMS 记录,rows 应为空,实际 {}",
rows.len()
);
}
#[test]
fn test_collector_interrupted_returns_empty() {
use crate::pipeline::Pipeline;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let dir = tempfile::TempDir::new().unwrap();
let log_path = dir.path().join("data.log");
let valid_dml = "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:TESTUSER trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT id FROM t. EXECTIME: 5(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n";
std::fs::write(&log_path, valid_dml).unwrap();
let pipeline = Pipeline::default();
let interrupted = Arc::new(AtomicBool::new(true));
interrupted.store(true, Ordering::Release);
let (rows, _stats) =
collector::collect_log_file(&log_path, &pipeline, false, None, &interrupted)
.expect("collect_log_file 应 Ok");
assert!(
rows.is_empty(),
"interrupted=true 应使循环立即 break,rows 应为空"
);
}
#[test]
fn test_build_indicator_filters_min_row_count_zero() {
use crate::pipeline::filters::IndicatorFilters;
let indicators = IndicatorFilters {
min_row_count: Some(0),
..IndicatorFilters::default()
};
let filters = super::prescan::build_indicator_filters(&indicators);
assert_eq!(
filters.len(),
1,
"min_row_count=0 应构建一个全匹配 Filter(FilterBuilder::new().build() 分支)"
);
}
#[test]
fn test_build_indicator_filters_min_row_count_positive() {
use crate::pipeline::filters::IndicatorFilters;
let indicators = IndicatorFilters {
min_row_count: Some(5),
..IndicatorFilters::default()
};
let filters = super::prescan::build_indicator_filters(&indicators);
assert_eq!(
filters.len(),
1,
"min_row_count=5 应构建一个带 rowcount_gt(4) 约束的 Filter"
);
}
#[test]
fn test_build_indicator_filters_empty_returns_empty() {
use crate::pipeline::filters::IndicatorFilters;
let indicators = IndicatorFilters::default();
let filters = super::prescan::build_indicator_filters(&indicators);
assert_eq!(filters.len(), 0, "所有字段均为 None 时应返回空 Vec<Filter>");
}
#[test]
fn test_build_sql_exclude_filters_multiple_returns_correct_count() {
use crate::pipeline::filters::SqlFilters;
let sf = SqlFilters {
excludes: Some(vec![
"SELECT 1".into(),
"DROP".into(),
"DELETE FROM x".into(),
]),
includes: None,
};
let filters = super::prescan::build_sql_exclude_filters(&sf);
assert_eq!(
filters.len(),
3,
"3 个 exclude 模式应构建 3 个 Filter(非空 excludes 分支)"
);
}
#[test]
fn test_build_sql_exclude_filters_none_returns_empty() {
use crate::pipeline::filters::SqlFilters;
let sf = SqlFilters::default();
let filters = super::prescan::build_sql_exclude_filters(&sf);
assert_eq!(
filters.len(),
0,
"excludes=None 应通过 unwrap_or(&[]) 返回空 Vec<Filter>"
);
}
#[test]
fn test_build_sql_include_filters_multiple() {
use crate::pipeline::filters::SqlFilters;
let sf = SqlFilters {
includes: Some(vec!["SELECT".into(), "UPDATE".into()]),
excludes: None,
};
let filters = super::prescan::build_sql_include_filters(&sf);
assert_eq!(filters.len(), 2, "2 个 include 模式应构建 2 个 Filter");
}
#[test]
fn test_build_indicator_filters_exec_ids_multiple() {
use crate::pipeline::filters::IndicatorFilters;
use std::collections::HashSet;
let indicators = IndicatorFilters {
exec_ids: Some(HashSet::from([1_i64, 2, 42])),
..IndicatorFilters::default()
};
let filters = super::prescan::build_indicator_filters(&indicators);
assert_eq!(
filters.len(),
3,
"3 个 exec_ids 应产生 3 个独立的 Filter(每个 ID 一个)"
);
}
#[test]
fn test_min_row_count_zero_matches_all_records() {
use crate::pipeline::FiltersFeature;
use crate::pipeline::filters::IndicatorFilters;
use std::fmt::Write as _;
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let mut buf = String::new();
for i in 0..3_usize {
writeln!(
buf,
"2025-01-15 10:30:28.001 (EP[0] sess:0x{i:04x} user:U trxid:{i} stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT {i}. EXECTIME: 1(ms) ROWCOUNT: {i}(rows) EXEC_ID: {i}.",
).unwrap();
}
std::fs::write(&logfile, &buf).unwrap();
let cfg = Config {
filter: Some(FiltersFeature {
enable: true,
indicators: IndicatorFilters {
min_row_count: Some(0),
..IndicatorFilters::default()
},
..FiltersFeature::default()
}),
..Config::default()
};
let matched = super::prescan::scan_log_file_for_matches(logfile.to_str().unwrap(), &cfg);
assert_eq!(
matched.len(),
3,
"min_row_count=0 应匹配所有记录(全匹配 Filter),实际匹配: {matched:?}",
);
}
#[test]
fn test_scan_for_trxids_by_transaction_filters_dedup_across_files() {
use crate::pipeline::FiltersFeature;
use crate::pipeline::filters::IndicatorFilters;
use std::fmt::Write as _;
let dir = tempfile::TempDir::new().unwrap();
let write_log = |filename: &str, ids: &[usize]| {
let path = dir.path().join(filename);
let mut buf = String::new();
for &i in ids {
writeln!(
buf,
"2025-01-15 10:30:28.001 (EP[0] sess:0x{i:04x} user:U trxid:{i} stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT {i}. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.",
).unwrap();
}
std::fs::write(&path, &buf).unwrap();
path
};
let file1 = write_log("a.log", &[0, 1]);
let file2 = write_log("b.log", &[1, 2]);
let cfg = Config {
filter: Some(FiltersFeature {
enable: true,
indicators: IndicatorFilters {
min_row_count: Some(0),
..IndicatorFilters::default()
},
..FiltersFeature::default()
}),
..Config::default()
};
let mut matched =
super::prescan::scan_for_trxids_by_transaction_filters(&[file1, file2], &cfg, 2).unwrap();
matched.sort();
assert_eq!(
matched,
vec!["0".to_string(), "1".to_string(), "2".to_string()],
"跨文件应返回去重后的 trxid 列表,实际: {matched:?}"
);
}