use super::debounce::should_trigger;
use super::dirs::collect_watch_dirs;
use super::handler::handle_watch;
use super::state::{DEBOUNCE_WINDOW, WatchLoopState};
use super::status::{format_elapsed_hms, render_active_status};
use super::trigger_full::trigger_full_file;
use super::trigger_incremental::{read_bytes_to_tempfile, trigger_incremental};
use crate::config::Config;
use crate::error::Error;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, Instant};
#[test]
fn test_interrupted_flag_exits_immediately() {
let cfg = Config::default();
let interrupted = Arc::new(AtomicBool::new(true));
let result = handle_watch(&cfg, true, false, &interrupted);
let _ = result;
}
#[test]
fn test_watch_loop_state_new_with_offsets() {
let mut initial_offsets = HashMap::new();
initial_offsets.insert(PathBuf::from("/tmp/test.log"), 1024u64);
let state = WatchLoopState::new(initial_offsets.clone(), Some("test.db".to_string()));
assert_eq!(
state.file_offsets.get(&PathBuf::from("/tmp/test.log")),
Some(&1024u64)
);
assert_eq!(state.sqlite_db_url, Some("test.db".to_string()));
assert_eq!(state.trigger_count, 0);
}
#[test]
fn test_watch_loop_state_new_without_sqlite() {
let state = WatchLoopState::new(HashMap::new(), None);
assert!(state.file_offsets.is_empty());
assert!(state.sqlite_db_url.is_none());
}
#[test]
fn test_collect_watch_dirs_nonexistent_glob_returns_empty() {
let dirs = collect_watch_dirs(&["nonexistent_dir_xyz/*.log".to_string()]);
assert!(
dirs.is_empty(),
"nonexistent glob parent should yield empty Vec, got: {dirs:?}"
);
}
#[test]
fn test_collect_watch_dirs_existing_dir_returns_itself() {
let tmp = tempfile::TempDir::new().expect("failed to create tempdir");
let dir_path = tmp.path().to_string_lossy().into_owned();
let dirs = collect_watch_dirs(&[dir_path]);
assert_eq!(dirs.len(), 1, "existing dir should yield exactly 1 entry");
assert_eq!(
dirs[0].canonicalize().ok(),
tmp.path().canonicalize().ok(),
"returned dir should match the temp dir"
);
}
#[test]
fn test_format_elapsed_hms_zero() {
assert_eq!(
format_elapsed_hms(Duration::from_secs(0)),
"00:00:00",
"zero seconds should format as 00:00:00"
);
}
#[test]
fn test_format_elapsed_hms_3661_seconds() {
assert_eq!(
format_elapsed_hms(Duration::from_secs(3661)),
"01:01:01",
"3661 seconds should format as 01:01:01"
);
}
#[test]
fn test_should_trigger_first_call_admits_and_records() {
let mut map: HashMap<PathBuf, Instant> = HashMap::new();
let path = PathBuf::from("/tmp/test.log");
let now = Instant::now();
let result = should_trigger(&path, &mut map, now, DEBOUNCE_WINDOW);
assert!(result, "首次调用应返回 true(允许触发)");
assert!(
map.contains_key(&path),
"首次触发后,路径应写入 debounce_map"
);
let recorded = map[&path];
assert_eq!(recorded, now, "记录的时间戳应等于传入的 now");
}
#[test]
fn test_should_trigger_within_window_rejects() {
let mut map: HashMap<PathBuf, Instant> = HashMap::new();
let path = PathBuf::from("/tmp/test2.log");
let t0 = Instant::now();
let first = should_trigger(&path, &mut map, t0, DEBOUNCE_WINDOW);
assert!(first, "首次调用应返回 true");
let t1 = t0 + Duration::from_millis(100);
let second = should_trigger(&path, &mut map, t1, DEBOUNCE_WINDOW);
assert!(!second, "窗口期 100ms 内再次触发应被抑制(返回 false)");
let t2 = t0 + Duration::from_millis(600);
let third = should_trigger(&path, &mut map, t2, DEBOUNCE_WINDOW);
assert!(third, "窗口期外(600ms > 500ms)应允许触发(返回 true)");
assert_eq!(map[&path], t2, "允许触发后,表项应更新为最新时间戳");
}
#[test]
fn test_should_trigger_isolates_distinct_paths() {
let mut map: HashMap<PathBuf, Instant> = HashMap::new();
let path1 = PathBuf::from("/tmp/a.log");
let path2 = PathBuf::from("/tmp/b.log");
let t0 = Instant::now();
let _ = should_trigger(&path1, &mut map, t0, DEBOUNCE_WINDOW);
let t1 = t0 + Duration::from_millis(100);
let result = should_trigger(&path2, &mut map, t1, DEBOUNCE_WINDOW);
assert!(result, "不同路径(P2)不受 P1 防抖窗口影响,应返回 true");
}
#[test]
fn test_render_active_status_includes_human_duration() {
let status_zero = render_active_status("/foo", 3, 42, Duration::from_secs(0));
assert!(
status_zero.contains("triggers: 3"),
"状态行应包含 triggers: 3,实际: {status_zero}"
);
assert!(
status_zero.contains("processed: 42 rows"),
"状态行应包含 processed: 42 rows,实际: {status_zero}"
);
assert!(
status_zero.contains("last: "),
"状态行应包含 last: 前缀,实际: {status_zero}"
);
let status_125 = render_active_status("/foo", 1, 1, Duration::from_secs(125));
assert!(
status_125.contains('m'),
"125 秒的 HumanDuration 输出应包含 'm'(分钟),实际: {status_125}"
);
}
#[test]
fn test_read_bytes_to_tempfile_reads_from_offset() {
use std::io::Read;
let mut source_file = tempfile::NamedTempFile::new().expect("failed to create tmp");
let content = b"Hello World! This is a test file with 50 bytes content!";
std::io::Write::write_all(&mut source_file, content).unwrap();
let source_path = source_file.path().to_path_buf();
let start_offset = 13u64; let tmp_result = read_bytes_to_tempfile(&source_path, start_offset)
.expect("read_bytes_to_tempfile should succeed");
let mut actual_content = Vec::new();
std::fs::File::open(tmp_result.path())
.unwrap()
.read_to_end(&mut actual_content)
.unwrap();
let start_idx = usize::try_from(start_offset).expect("offset fits in usize");
assert_eq!(
actual_content,
&content[start_idx..],
"临时文件应仅包含 start_offset 之后的字节"
);
}
#[test]
fn test_read_bytes_to_tempfile_at_eof_produces_empty() {
use std::io::Read;
let mut source_file = tempfile::NamedTempFile::new().expect("failed to create tmp");
let content = b"some content here";
std::io::Write::write_all(&mut source_file, content).unwrap();
let source_path = source_file.path().to_path_buf();
let start_offset = content.len() as u64; let tmp_result = read_bytes_to_tempfile(&source_path, start_offset)
.expect("read_bytes_to_tempfile at EOF should succeed");
let mut actual_content = Vec::new();
std::fs::File::open(tmp_result.path())
.unwrap()
.read_to_end(&mut actual_content)
.unwrap();
assert!(
actual_content.is_empty(),
"offset == 文件大小时,临时文件应为空,实际: {actual_content:?}"
);
}
#[test]
fn test_trigger_incremental_skips_if_no_new_bytes() {
let source_file = tempfile::NamedTempFile::new().expect("failed to create tmp");
let content = vec![0u8; 100];
std::fs::write(source_file.path(), &content).unwrap();
let canonical_path = source_file.path().canonicalize().unwrap();
let mut initial_offsets = HashMap::new();
initial_offsets.insert(canonical_path, 100u64);
let mut state = WatchLoopState::new(initial_offsets, None);
let cfg = Config::default();
let interrupted = Arc::new(AtomicBool::new(false));
let pb = indicatif::ProgressBar::hidden();
trigger_incremental(
source_file.path(),
&cfg,
true,
false,
&interrupted,
&mut state,
&pb,
);
assert_eq!(
state.trigger_count, 0,
"new_size == start_offset 时 trigger_count 不应增加"
);
}
#[test]
fn test_trigger_incremental_first_seen_records_baseline() {
let source_file = tempfile::NamedTempFile::new().expect("failed to create tmp");
let content = vec![0u8; 100];
std::fs::write(source_file.path(), &content).unwrap();
let canonical_path = source_file.path().canonicalize().unwrap();
let mut state = WatchLoopState::new(HashMap::new(), None);
let cfg = Config::default();
let interrupted = Arc::new(AtomicBool::new(false));
let pb = indicatif::ProgressBar::hidden();
trigger_incremental(
source_file.path(),
&cfg,
true,
false,
&interrupted,
&mut state,
&pb,
);
assert_eq!(
state.trigger_count, 0,
"首次 Modify 应跳过处理,trigger_count 不增"
);
assert_eq!(
state.file_offsets.get(&canonical_path),
Some(&100u64),
"首次 Modify 应将当前文件大小作为基线写入 file_offsets"
);
}
const DM_LOG_LINE_SEL: &str = "2024-01-15 10:00:00.123 (EP[0] sess:0x0001 thrd:456 user:SYSDBA trxid:1001 stmt:select_001 appname:test ip:127.0.0.1) [SEL] select * from t1. EXECTIME: 5(ms) ROWCOUNT: 10(rows) EXEC_ID: 1.\n";
const DM_LOG_LINE_SEL2: &str = "2024-01-15 10:00:01.456 (EP[0] sess:0x0002 thrd:457 user:SYSDBA trxid:1002 stmt:select_002 appname:test ip:127.0.0.1) [SEL] select * from t2. EXECTIME: 3(ms) ROWCOUNT: 5(rows) EXEC_ID: 2.\n";
const DM_LOG_LINE_GARBAGE: &str = "this is not a valid dm sql log line at all\n";
#[test]
fn test_watch_csv_append() {
let dir = tempfile::TempDir::new().unwrap();
let log_a = dir.path().join("a.log");
let log_b = dir.path().join("b.log");
let csv_path = dir.path().join("out.csv");
std::fs::write(&log_a, DM_LOG_LINE_SEL).unwrap();
std::fs::write(&log_b, DM_LOG_LINE_SEL2).unwrap();
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: crate::config::Config = toml::from_str(&toml).unwrap();
let interrupted = Arc::new(AtomicBool::new(false));
let pb = indicatif::ProgressBar::hidden();
let mut state = WatchLoopState::new(HashMap::new(), None);
trigger_full_file(&log_a, &cfg, true, false, &interrupted, &mut state, &pb);
trigger_full_file(&log_b, &cfg, true, false, &interrupted, &mut state, &pb);
assert!(csv_path.exists(), "CSV 文件应在触发后存在");
let content = std::fs::read_to_string(&csv_path).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert!(
lines.len() >= 3,
"CSV 应有 header + 2 data rows,实际: {} 行,内容:\n{content}",
lines.len()
);
let header = lines[0];
let header_count = lines.iter().filter(|&&l| l == header).count();
assert_eq!(
header_count, 1,
"header 行应只出现一次,实际出现 {header_count} 次"
);
}
#[test]
fn test_watch_error_log_append() {
let dir = tempfile::TempDir::new().unwrap();
let log_a = dir.path().join("a.log");
let log_b = dir.path().join("b.log");
let csv_path = dir.path().join("out.csv");
let error_log_path = dir.path().join("errors.log");
let content_a = format!("{DM_LOG_LINE_GARBAGE}{DM_LOG_LINE_SEL}");
let content_b = format!("{DM_LOG_LINE_GARBAGE}{DM_LOG_LINE_SEL2}");
std::fs::write(&log_a, &content_a).unwrap();
std::fs::write(&log_b, &content_b).unwrap();
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
errlog = error_log_path.to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: crate::config::Config = toml::from_str(&toml).unwrap();
let interrupted = Arc::new(AtomicBool::new(false));
let pb = indicatif::ProgressBar::hidden();
let mut state = WatchLoopState::new(HashMap::new(), None);
trigger_full_file(&log_a, &cfg, true, false, &interrupted, &mut state, &pb);
trigger_full_file(&log_b, &cfg, true, false, &interrupted, &mut state, &pb);
assert!(error_log_path.exists(), "error log 应在有解析错误时创建");
let error_content = std::fs::read_to_string(&error_log_path).unwrap();
let error_line_count = error_content
.lines()
.filter(|l| l.starts_with("[ERROR]"))
.count();
assert!(
error_line_count >= 2,
"error log 应包含 2 条 [ERROR] 行(来自 A 和 B 各 1 次触发),实际: {error_line_count} 条\n内容:\n{error_content}"
);
}
#[test]
fn test_handle_watch_returns_interrupted() {
let dir = tempfile::TempDir::new().unwrap();
let csv_path = dir.path().join("out.csv");
let toml = format!(
"[sqllog]\ninputs = [\"{logdir}\"]\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
logdir = dir.path().to_string_lossy().replace('\\', "/"),
csv = csv_path.to_string_lossy().replace('\\', "/"),
);
let cfg: crate::config::Config = toml::from_str(&toml).unwrap();
let interrupted = Arc::new(AtomicBool::new(true));
let result = handle_watch(&cfg, true, false, &interrupted);
assert!(
matches!(result, Err(Error::Interrupted)),
"interrupted=true 时 handle_watch 应返回 Err(Error::Interrupted),实际: {result:?}"
);
}