use crate::error::{Error, Result};
use crate::error_logger::ErrorLogger;
use crate::exporter::ExporterManager;
use crate::parser::SqllogParser;
use crate::{config::Config, error::ParserError};
use dm_database_parser_sqllog::LogParser;
use log::{info, warn};
use std::time::Instant;
fn process_log_file(
file_path: &str,
file_index: usize,
total_files: usize,
exporter_manager: &mut ExporterManager,
error_logger: &mut ErrorLogger,
) -> Result<()> {
let file_start = Instant::now();
eprintln!("[{file_index}/{total_files}] Processing: {file_path}");
info!("Processing file {file_index}/{total_files}: {file_path}");
let parser = LogParser::from_path(file_path).map_err(|e| {
Error::Parser(ParserError::InvalidPath {
path: file_path.into(),
reason: format!("{e}"),
})
})?;
let mut batch = Vec::with_capacity(1000);
let mut records_in_file = 0;
let mut errors_in_file = 0;
for result in parser.iter() {
match result {
Ok(record) => {
batch.push(record);
records_in_file += 1;
if batch.len() >= 1000 {
exporter_manager.export_batch(&batch)?;
batch.clear();
eprintln!(" [Progress] {records_in_file} records processed");
}
}
Err(e) => {
errors_in_file += 1;
if !batch.is_empty() {
exporter_manager.export_batch(&batch)?;
batch.clear();
}
if let Err(log_err) = error_logger.log_parse_error(file_path, &e) {
warn!("Failed to record parse error: {log_err}");
}
}
}
}
if !batch.is_empty() {
exporter_manager.export_batch(&batch)?;
}
let file_elapsed = file_start.elapsed();
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_lossless
)]
let throughput = if file_elapsed.as_secs_f64() > 0.0 {
(records_in_file as f64 / file_elapsed.as_secs_f64()).round() as usize
} else {
0
};
let file_secs = file_elapsed.as_secs_f64();
eprintln!(
" [Complete] {records_in_file} records, {errors_in_file} errors, {file_secs:.2}s, {throughput} records/sec"
);
info!(
"File {file_path} processed: {records_in_file} records, {errors_in_file} errors, {file_secs:.2}s"
);
Ok(())
}
pub fn handle_run(cfg: &Config) -> Result<()> {
let total_start = Instant::now();
info!("Starting SQL log export task");
let parser = SqllogParser::new(cfg.sqllog.directory());
info!("SQL log input directory: {}", parser.path().display());
let mut exporter_manager = ExporterManager::from_config(cfg)?;
info!("Using exporter: {}", exporter_manager.name());
let mut error_logger = ErrorLogger::new(cfg.error.file())?;
info!("Initializing exporters...");
exporter_manager.initialize()?;
info!("Parsing and exporting SQL logs (streaming)...");
let log_files = parser.log_files()?;
if log_files.is_empty() {
warn!("No log files found");
exporter_manager.finalize()?;
error_logger.finalize()?;
return Ok(());
}
info!("Found {} log file(s)", log_files.len());
for (idx, log_file) in log_files.iter().enumerate() {
let file_path_str = log_file.to_string_lossy().to_string();
process_log_file(
&file_path_str,
idx + 1,
log_files.len(),
&mut exporter_manager,
&mut error_logger,
)?;
}
info!("Export finished...");
exporter_manager.finalize()?;
error_logger.finalize()?;
let total_elapsed = total_start.elapsed();
exporter_manager.log_stats();
eprintln!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
eprintln!("✓ SQL Log Export Task Completed");
eprintln!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
eprintln!(" Exporter: {}", exporter_manager.name());
let elapsed_secs = total_elapsed.as_secs_f64();
eprintln!(" Elapsed: {elapsed_secs:.2} seconds");
if let Some(stats) = exporter_manager.stats() {
let elapsed_millis = total_elapsed.as_millis();
let throughput = if elapsed_millis > 0 {
(stats.exported as u128 * 1_000) / elapsed_millis
} else {
0
};
eprintln!(" Records: {}", stats.exported);
eprintln!(" Throughput: {throughput} records/sec");
}
eprintln!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
info!("✓ SQL log export task completed!");
Ok(())
}