use crate::error::{Error, Result};
use crate::error_logger::ErrorLogger;
use crate::exporter::ExporterManager;
use crate::parser::SqllogParser;
use crate::{config::Config, error::ParserError};
use dm_database_parser_sqllog::LogParser;
use log::{info, warn};
use std::time::Instant;
fn process_log_file(
file_path: &str,
exporter_manager: &mut ExporterManager,
error_logger: &mut ErrorLogger,
) -> Result<()> {
info!("Processing file: {}", file_path);
let parser = LogParser::from_path(file_path).map_err(|e| {
Error::Parser(ParserError::InvalidPath {
path: file_path.into(),
reason: format!("{}", e),
})
})?;
let mut batch = Vec::with_capacity(1000);
for result in parser.iter() {
match result {
Ok(record) => {
batch.push(record);
if batch.len() >= 1000 {
exporter_manager.export_batch(&batch)?;
batch.clear();
}
}
Err(e) => {
if !batch.is_empty() {
exporter_manager.export_batch(&batch)?;
batch.clear();
}
if let Err(log_err) = error_logger.log_parse_error(file_path, &e) {
warn!("Failed to record parse error: {}", log_err);
}
}
}
}
if !batch.is_empty() {
exporter_manager.export_batch(&batch)?;
}
Ok(())
}
pub fn handle_run(cfg: &Config) -> Result<()> {
let total_start = Instant::now();
info!("Starting SQL log export task");
let parser = SqllogParser::new(cfg.sqllog.directory());
info!("SQL log input directory: {}", parser.path().display());
let mut exporter_manager = ExporterManager::from_config(cfg)?;
info!("Using exporter: {}", exporter_manager.name());
let mut error_logger = ErrorLogger::new(cfg.error.file())?;
info!("Initializing exporters...");
exporter_manager.initialize()?;
info!("Parsing and exporting SQL logs (streaming)...");
let log_files = parser.log_files()?;
if log_files.is_empty() {
warn!("No log files found");
exporter_manager.finalize()?;
error_logger.finalize()?;
return Ok(());
}
info!("Found {} log files", log_files.len());
for log_file in log_files {
let file_path_str = log_file.to_string_lossy().to_string();
process_log_file(&file_path_str, &mut exporter_manager, &mut error_logger)?;
}
info!("Export finished...");
exporter_manager.finalize()?;
error_logger.finalize()?;
let total_elapsed = total_start.elapsed().as_secs_f64();
exporter_manager.log_stats();
info!("✓ SQL log export task completed!");
info!(" - 导出器: {}", exporter_manager.name());
info!(" - 总耗时: {:.3} 秒", total_elapsed);
Ok(())
}