use inklog::config::DatabaseDriver;
use inklog::sink::database::DatabaseSink;
use inklog::sink::LogSink;
use inklog::{log_record::LogRecord, DatabaseSinkConfig};
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
use tracing::Level;
fn create_test_database_sink(
batch_size: usize,
flush_interval_ms: u64,
) -> (TempDir, DatabaseSink, String) {
let temp_dir = tempfile::TempDir::new().expect("Failed to create temp directory");
let db_path = temp_dir.path().join("test.db");
let url = format!("sqlite://{}?mode=rwc", db_path.display());
let config = DatabaseSinkConfig {
enabled: true,
driver: DatabaseDriver::SQLite,
url: url.clone(),
batch_size,
flush_interval_ms,
..Default::default()
};
let sink = DatabaseSink::new(config).expect("Failed to create DatabaseSink");
(temp_dir, sink, url)
}
fn count_database_logs(url: &str) -> i64 {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(async {
use inklog::sink::database::Entity;
use sea_orm::{Database, EntityTrait};
let db = Database::connect(url)
.await
.expect("Failed to connect to database");
let logs = Entity::find().all(&db).await.expect("Failed to query logs");
logs.len() as i64
})
}
#[test]
fn test_database_batch_write() {
let (_temp_dir, mut sink, url) = create_test_database_sink(5, 1000);
for i in 0..3 {
let record = LogRecord::new(Level::INFO, "batch_test".into(), format!("Message {}", i));
sink.write(&record).expect("Failed to write log record");
}
std::thread::sleep(Duration::from_millis(1100));
let record = LogRecord::new(Level::INFO, "batch_test".into(), "Trigger flush".into());
sink.write(&record).expect("Failed to write log record");
std::thread::sleep(Duration::from_millis(200));
let count_before = count_database_logs(&url);
assert_eq!(count_before, 4, "时间刷新应该写入4条记录");
for i in 4..9 {
let record = LogRecord::new(Level::INFO, "batch_test".into(), format!("Message {}", i));
sink.write(&record).expect("Failed to write log record");
}
std::thread::sleep(Duration::from_millis(500));
let count_after = count_database_logs(&url);
assert_eq!(
count_after, 9,
"批次写入应该触发,当前记录数: {}",
count_after
);
println!("批量写入测试通过!批次大小: 5, 实际写入: {}", count_after);
}
#[test]
fn test_database_timeout_flush() {
let (_temp_dir, mut sink, url) = create_test_database_sink(100, 300);
let record1 = LogRecord::new(Level::INFO, "timeout_test".into(), "First message".into());
sink.write(&record1)
.expect("Failed to write first log record");
std::thread::sleep(Duration::from_millis(500));
let record2 = LogRecord::new(Level::INFO, "timeout_test".into(), "Second message".into());
sink.write(&record2)
.expect("Failed to write second log record");
std::thread::sleep(Duration::from_millis(500));
let count = count_database_logs(&url);
assert!(count >= 1, "超时刷新应该触发写入,当前记录数: {}", count);
println!("超时刷新测试通过!刷新间隔: 300ms, 实际写入: {}", count);
}