use super::super::Exporter;
use super::CsvExporter;
use super::writer::write_csv_escaped;
use dm_database_parser_sqllog::LogParserBuilder;
fn write_test_log(path: &std::path::Path, count: usize) {
use std::fmt::Write as _;
let mut buf = String::with_capacity(count * 170);
for i in 0..count {
writeln!(
buf,
"2025-01-15 10:30:28.001 (EP[0] sess:0x{i:04x} user:TESTUSER trxid:{i} stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT * FROM t WHERE id={i}. EXECTIME: {exec}(ms) ROWCOUNT: {rows}(rows) EXEC_ID: {i}.",
exec = (i * 13) % 1000,
rows = i % 100,
).unwrap();
}
std::fs::write(path, buf).unwrap();
}
#[test]
fn test_csv_basic_export() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 5);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
assert!(!records.is_empty());
let mut exporter = CsvExporter::new(&outfile);
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_normalized(r, None).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&outfile).unwrap();
assert!(content.starts_with("ts,ep,"));
assert!(content.contains("normalized_sql"));
assert_eq!(content.lines().count(), 6);
}
#[test]
fn test_csv_no_normalize() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 2);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter = CsvExporter::new(&outfile);
exporter.normalize = false;
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_normalized(r, None).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&outfile).unwrap();
assert!(!content.contains("normalized_sql"));
}
#[test]
fn test_csv_export_with_normalized() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter = CsvExporter::new(&outfile);
exporter.normalize = true;
exporter.initialize().unwrap();
for (i, r) in records.iter().enumerate() {
let ns = format!("SELECT * FROM t WHERE id=?_{i}");
exporter.export_one_normalized(r, Some(&ns)).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&outfile).unwrap();
assert!(content.contains("SELECT * FROM t WHERE id=?_0"));
}
#[test]
fn test_csv_append_mode() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 2);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
{
let mut exporter = CsvExporter::from_config(&crate::config::CsvExporterConfig {
file: outfile.to_string_lossy().into(),
overwrite: true,
append: false,
..crate::config::CsvExporterConfig::default()
});
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_normalized(r, None).unwrap();
}
exporter.finalize().unwrap();
}
let first_count = std::fs::read_to_string(&outfile).unwrap().lines().count();
{
let mut exporter = CsvExporter::from_config(&crate::config::CsvExporterConfig {
file: outfile.to_string_lossy().into(),
overwrite: false,
append: true,
..crate::config::CsvExporterConfig::default()
});
exporter.initialize().unwrap();
for r in &records {
exporter.export_one_normalized(r, None).unwrap();
}
exporter.finalize().unwrap();
}
let second_count = std::fs::read_to_string(&outfile).unwrap().lines().count();
assert_eq!(
second_count,
first_count + records.len(),
"append mode must add only data rows, not a second header"
);
}
#[test]
fn test_csv_empty_export_is_noop() {
let dir = tempfile::TempDir::new().unwrap();
let outfile = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&outfile);
exporter.initialize().unwrap();
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&outfile).unwrap();
assert_eq!(content.lines().count(), 1);
}
#[test]
fn test_csv_debug_format() {
let dir = tempfile::TempDir::new().unwrap();
let exporter = CsvExporter::new(dir.path().join("debug.csv"));
let s = format!("{exporter:?}");
assert!(s.contains("CsvExporter"));
}
#[test]
fn test_csv_export_method() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter = CsvExporter::new(&outfile);
exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
exporter.finalize().unwrap();
let lines = std::fs::read_to_string(&outfile).unwrap().lines().count();
assert_eq!(lines, records.len() + 1); }
#[test]
fn test_csv_stats_snapshot() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 5);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter = CsvExporter::new(&outfile);
exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
let snap = exporter.stats_snapshot().unwrap();
assert_eq!(snap.exported, 5);
exporter.finalize().unwrap();
}
#[test]
fn test_write_csv_escaped_with_quotes() {
let mut buf = Vec::new();
write_csv_escaped(&mut buf, b"say \"hello\"");
assert_eq!(buf, b"say \"\"hello\"\"");
}
#[test]
fn test_write_csv_escaped_no_quotes() {
let mut buf = Vec::new();
write_csv_escaped(&mut buf, b"no quotes here");
assert_eq!(buf, b"no quotes here");
}
#[test]
fn test_csv_from_config() {
use crate::config;
let dir = tempfile::TempDir::new().unwrap();
let cfg = config::CsvExporterConfig {
file: dir.path().join("cfg.csv").to_string_lossy().into_owned(),
overwrite: true,
append: false,
..config::CsvExporterConfig::default()
};
let exporter = CsvExporter::from_config(&cfg);
let s = format!("{exporter:?}");
assert!(s.contains("CsvExporter"));
assert_eq!(
exporter.include_performance_metrics, cfg.include_performance_metrics,
"from_config must map include_performance_metrics correctly"
);
}
#[test]
fn test_csv_header_field_order() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&path);
exporter.field_mask =
FieldMask::from_names(&["sql".to_string(), "username".to_string()]).unwrap();
exporter.ordered_indices = vec![10, 4]; exporter.initialize().unwrap();
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let header_line = content.lines().next().unwrap();
assert_eq!(header_line, "sql,username");
}
#[test]
fn test_csv_header_full_order() {
use crate::pipeline::FIELD_NAMES;
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&path);
exporter.normalize = true;
exporter.initialize().unwrap();
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let header_line = content.lines().next().unwrap();
let expected: Vec<&str> = FIELD_NAMES.to_vec();
assert_eq!(header_line, expected.join(","));
}
#[test]
fn test_csv_header_no_normalized_sql_when_normalize_false() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&path);
exporter.normalize = false;
exporter.initialize().unwrap();
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let header_line = content.lines().next().unwrap();
assert!(!header_line.contains("normalized_sql"));
assert!(header_line.contains("sql")); }
#[test]
fn test_csv_field_order() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:testuser trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = false;
exporter.field_mask =
FieldMask::from_names(&["sql".to_string(), "username".to_string()]).unwrap();
exporter.ordered_indices = vec![10, 4]; exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let mut lines = content.lines();
let header = lines.next().unwrap();
let data = lines.next().unwrap();
assert_eq!(header, "sql,username");
assert!(data.ends_with(",testuser"), "data line: {data}");
}
#[test]
fn test_csv_field_order_normalized_sql_skipped_when_normalize_false() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = false;
exporter.ordered_indices = vec![10, 14]; exporter.field_mask = FieldMask::from_names(&["sql".to_string()]).unwrap();
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let header = content.lines().next().unwrap();
assert!(!header.contains("normalized_sql"), "header: {header}");
}
#[test]
fn test_csv_reserve_boundary_short_sql() {
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("short.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
).unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
assert_eq!(content.lines().count(), 2);
let data = content.lines().nth(1).unwrap();
assert!(data.contains("\"SELECT 1"), "data row: {data}");
}
#[test]
fn test_csv_reserve_boundary_long_sql() {
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("long.log");
let big_sql = "x".repeat(4096);
let line = format!(
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT '{big_sql}'. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n"
);
std::fs::write(&log, &line).unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
assert_eq!(content.lines().count(), 2);
assert!(content.contains(&big_sql), "long SQL missing from output");
}
#[test]
fn test_csv_header_skips_pm_when_disabled() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&path);
exporter.include_performance_metrics = false;
exporter.initialize().unwrap();
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let header = content.lines().next().unwrap();
assert!(!header.contains("exec_time_ms"), "header: {header}");
assert!(!header.contains("row_count"), "header: {header}");
assert!(!header.contains("exec_id"), "header: {header}");
assert!(header.contains("sql"), "sql column should remain");
}
#[test]
fn test_csv_data_row_skips_pm_when_disabled() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 3);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter = CsvExporter::new(&outfile);
exporter.include_performance_metrics = false;
exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&outfile).unwrap();
let header = content.lines().next().unwrap();
assert!(
!header.contains("exec_time_ms"),
"exec_time_ms should be absent: {header}"
);
assert!(
!header.contains("row_count"),
"row_count should be absent: {header}"
);
assert!(
!header.contains("exec_id"),
"exec_id should be absent: {header}"
);
assert!(header.contains("sql"), "sql column should remain: {header}");
let header_cols = header.split(',').count();
assert_eq!(header_cols, 12, "header col count: {header}");
}
#[test]
fn test_csv_default_include_pm_true_keeps_existing_behavior() {
let dir = tempfile::TempDir::new().unwrap();
let logfile = dir.path().join("test.log");
let outfile = dir.path().join("out.csv");
write_test_log(&logfile, 2);
let parser = LogParserBuilder::new(logfile.to_str().unwrap())
.build()
.unwrap();
let records: Vec<_> = parser.iter().filter_map(std::result::Result::ok).collect();
let mut exporter = CsvExporter::new(&outfile);
exporter.initialize().unwrap();
for r in &records {
exporter.export(r).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&outfile).unwrap();
let header = content.lines().next().unwrap();
assert!(header.contains("exec_time_ms"));
assert!(header.contains("row_count"));
assert!(header.contains("exec_id"));
}
#[test]
fn test_csv_all_zero_metrics_outputs_empty_columns() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("zero.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 0(ms) ROWCOUNT: 0(rows) EXEC_ID: 0.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.field_mask = FieldMask::ALL;
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let data_row = content.lines().nth(1).unwrap();
assert!(
data_row.contains(",,"),
"has_metrics=false 时数据行应含 ',,' 表示空指标列,实际: {data_row}"
);
}
#[test]
fn test_csv_projection_subset_emits_only_requested_columns() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
write_test_log(&log, 3);
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = false;
exporter.field_mask =
FieldMask::from_names(&["ts".to_string(), "ep".to_string(), "sess_id".to_string()])
.unwrap();
exporter.ordered_indices = vec![0, 1, 2];
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let header = content.lines().next().unwrap();
assert_eq!(
header, "ts,ep,sess_id",
"header 应只含投影字段,实际: {header}"
);
for (i, line) in content.lines().skip(1).enumerate() {
let col_count = line.split(',').count();
assert_eq!(col_count, 3, "数据行 {i} 应有 3 列,实际: {line}");
}
}
#[test]
fn test_csv_projection_statement_appname_client_ip_tag() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x99 appname:MyApp ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = false;
exporter.field_mask = FieldMask::from_names(&[
"statement".to_string(),
"appname".to_string(),
"client_ip".to_string(),
"tag".to_string(),
])
.unwrap();
exporter.ordered_indices = vec![6, 7, 8, 9]; exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let header = content.lines().next().unwrap();
assert_eq!(
header, "statement,appname,client_ip,tag",
"header 应含 statement/appname/client_ip/tag,实际: {header}"
);
let data = content.lines().nth(1).unwrap();
assert!(
data.contains("MyApp"),
"数据行应含 appname=MyApp,实际: {data}"
);
assert!(
data.contains("10.0.0.1"),
"数据行应含 client_ip=10.0.0.1,实际: {data}"
);
}
#[test]
fn test_csv_projection_zero_metrics_skips_idx_11_12_13() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("zero.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 0(ms) ROWCOUNT: 0(rows) EXEC_ID: 0.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = false;
exporter.field_mask = FieldMask::from_names(&[
"exec_time_ms".to_string(),
"row_count".to_string(),
"exec_id".to_string(),
])
.unwrap();
exporter.ordered_indices = vec![11, 12, 13];
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export(&record).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let data_row = content.lines().nth(1).unwrap();
assert_eq!(
data_row, ",,",
"has_metrics=false 时投影路径下三列均应为空,实际: {data_row}"
);
}
#[test]
fn test_csv_projection_with_normalize_idx_14() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT * FROM t WHERE id=1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = true;
exporter.field_mask = FieldMask::from_names(&["normalized_sql".to_string()]).unwrap();
exporter.ordered_indices = vec![14];
exporter.initialize().unwrap();
let normalized_sql = "SELECT * FROM t WHERE id=?";
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter
.export_one_normalized(&record, Some(normalized_sql))
.unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let header = content.lines().next().unwrap();
assert_eq!(
header, "normalized_sql",
"header 应为 normalized_sql,实际: {header}"
);
let data = content.lines().nth(1).unwrap();
assert!(
data.contains(normalized_sql),
"数据行应含 normalized SQL,实际: {data}"
);
}
#[test]
fn test_csv_projection_with_normalize_none_emits_empty_idx_14() {
use crate::pipeline::FieldMask;
let dir = tempfile::TempDir::new().unwrap();
let log = dir.path().join("t.log");
std::fs::write(
&log,
"2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
)
.unwrap();
let out = dir.path().join("out.csv");
let mut exporter = CsvExporter::new(&out);
exporter.normalize = true;
exporter.field_mask = FieldMask::from_names(&["normalized_sql".to_string()]).unwrap();
exporter.ordered_indices = vec![14];
exporter.initialize().unwrap();
let parser = LogParserBuilder::new(log.to_str().unwrap())
.build()
.unwrap();
for record in parser.iter().flatten() {
exporter.export_one_normalized(&record, None).unwrap();
}
exporter.finalize().unwrap();
let content = std::fs::read_to_string(&out).unwrap();
let data_row = content.lines().nth(1).unwrap();
assert_eq!(
data_row, "",
"normalized_sql=None 时数据行应为空,实际: {data_row}"
);
}