use crate::error::{
ErrorStats, ParseErrorRecord, Result, classify_error_kind, truncate_to_120_chars,
};
use crate::exporter::ExporterManager;
use crate::pipeline::Pipeline;
use crate::pipeline::normalizer::ParamBuffer;
use dm_database_parser_sqllog::ParseError;
use dm_database_parser_sqllog::Sqllog;
use indicatif::ProgressBar;
use log::info;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
pub(super) enum ExportAction {
Continue,
BreakQuota,
BreakFatal,
}
fn update_params_buffer_only(
record: &Sqllog,
params_buffer: &mut ParamBuffer,
placeholder_override: Option<bool>,
ns_scratch: &mut Vec<u8>,
) {
let _ = crate::pipeline::compute_normalized(
record,
&record.sql,
params_buffer,
placeholder_override,
ns_scratch,
);
}
#[allow(clippy::too_many_arguments)]
pub(super) fn normalize_and_export(
record: &Sqllog,
exporter_manager: &mut ExporterManager,
include_pm: bool,
do_normalize: bool,
params_buffer: &mut ParamBuffer,
placeholder_override: Option<bool>,
ns_scratch: &mut Vec<u8>,
remaining: Option<usize>,
records_in_file: &mut usize,
file_stats: &mut ErrorStats,
file_path: &str,
passes: bool,
) -> ExportAction {
if !passes {
if do_normalize && record.tag.is_none() {
update_params_buffer_only(record, params_buffer, placeholder_override, ns_scratch);
}
file_stats.filtered_out += 1;
return ExportAction::Continue;
}
let ns = if do_normalize && (!params_buffer.is_empty() || record.tag.is_none()) {
crate::pipeline::compute_normalized(
record,
&record.sql,
params_buffer,
placeholder_override,
ns_scratch,
)
} else {
None
};
if let Some(remaining) = remaining {
if *records_in_file >= remaining {
return ExportAction::BreakQuota;
}
}
let export_result = exporter_manager.export_one_preparsed(record, include_pm, ns);
match export_result {
Ok(()) => {
*records_in_file += 1;
ExportAction::Continue
}
Err(ref e) if e.is_fatal() => {
file_stats.set_fatal(e.to_string());
eprintln!("[{}] {file_path}: {e}", e.severity());
log::warn!("{file_path} | fatal export error: {export_result:?}");
ExportAction::BreakFatal
}
Err(ref e) => {
file_stats.add_export_error();
eprintln!("[{}] {file_path}: {e}", e.severity());
log::warn!("{file_path} | export error: {export_result:?}");
ExportAction::Continue
}
}
}
fn setup_progress_bar(
pb: Option<&ProgressBar>,
reset_pb: bool,
show_progress: bool,
file_index: usize,
total_files: usize,
file_name: &str,
) {
if reset_pb && show_progress {
if let Some(pb) = pb {
pb.set_message(format!("[{file_index}/{total_files}] {file_name}"));
}
}
}
#[allow(clippy::too_many_arguments)]
fn log_file_result(
pb: Option<&ProgressBar>,
show_progress: bool,
file_path: &str,
file_index: usize,
total_files: usize,
records_in_file: usize,
errors_in_file: usize,
elapsed: f64,
) {
if errors_in_file > 0 {
log::warn!("{file_path}: {errors_in_file} parse errors");
}
info!(
"File {file_path}: {records_in_file} records, {errors_in_file} errors, total {elapsed:.2}s",
);
if show_progress {
if let Some(pb) = pb {
let errors_label = if errors_in_file > 0 {
format!(", {errors_in_file} errors")
} else {
String::new()
};
pb.set_message(format!(
"✓ [{file_index}/{total_files}] {file_path} — {records_in_file}{errors_label}, {elapsed:.2}s",
));
pb.inc(1);
}
}
}
fn tick_progress(
pb: Option<&ProgressBar>,
records_in_file: usize,
file_start: std::time::Instant,
file_name: &str,
interrupted: &Arc<AtomicBool>,
) -> bool {
if records_in_file == 0 {
return false;
}
if records_in_file.trailing_zeros() >= 10 {
if let Some(pb) = pb {
let elapsed = file_start.elapsed().as_secs_f64();
#[allow(clippy::cast_precision_loss)]
let rec_per_s = records_in_file as f64 / elapsed.max(1e-9);
let speed_label = if rec_per_s >= 10_000.0 {
format!("{:.0}k rec/s", rec_per_s / 1000.0)
} else {
format!("{rec_per_s:.0} rec/s")
};
pb.set_message(format!("{file_name} | {speed_label}"));
}
if interrupted.load(Ordering::Acquire) {
return true;
}
}
false
}
#[rustfmt::skip]
pub(super) fn process_log_file(
file_path: &str, file_index: usize, total_files: usize,
exporter_manager: &mut ExporterManager, pipeline: &Pipeline,
show_progress: bool, remaining: Option<usize>, interrupted: &Arc<AtomicBool>,
do_normalize: bool, placeholder_override: Option<bool>,
params_buffer: &mut ParamBuffer, ns_scratch: &mut Vec<u8>,
reset_pb: bool, pb: Option<&ProgressBar>,
) -> Result<(usize, ErrorStats)> {
params_buffer.clear();
let include_pm = exporter_manager.csv_include_performance_metrics();
let file_start = Instant::now();
let file_name = std::path::Path::new(file_path).file_name()
.map_or_else(|| file_path.to_string(), |n| n.to_string_lossy().into_owned());
setup_progress_bar(pb, reset_pb, show_progress, file_index, total_files, &file_name);
let parser = crate::scanner::build_parser(std::path::Path::new(file_path))?;
let (mut records_in_file, mut errors_in_file, mut file_stats) =
(0usize, 0usize, ErrorStats::default());
let mut total_processed = 0usize;
'outer: for result in parser.iter() {
match result {
Ok(record) => {
let passes = pipeline.is_empty() || pipeline.run_with_meta(&record);
let needs_processing = passes || (do_normalize && record.tag.is_none());
if !needs_processing { continue; }
let action = normalize_and_export(
&record, exporter_manager, include_pm, do_normalize,
params_buffer, placeholder_override, ns_scratch,
remaining, &mut records_in_file, &mut file_stats, file_path, passes,
);
total_processed = total_processed.wrapping_add(1);
match action {
ExportAction::BreakQuota | ExportAction::BreakFatal => break 'outer,
ExportAction::Continue if passes && tick_progress(pb, records_in_file, file_start, &file_name, interrupted) => break 'outer,
ExportAction::Continue if !passes
&& total_processed.trailing_zeros() >= 10
&& interrupted.load(Ordering::Acquire) => break 'outer,
ExportAction::Continue => {}
}
}
Err(e) => {
errors_in_file += 1;
let (line_number, raw_ref) = match &e {
ParseError::InvalidFormat { raw, line_number } => (*line_number, raw.as_str()),
_ => (0u64, ""),
};
let kind = classify_error_kind(raw_ref);
file_stats.add_parse_error_with_kind(kind);
if file_stats.parse_error_records.len() < 10_000 {
file_stats.parse_error_records.push(ParseErrorRecord {
line_number,
raw_truncated: truncate_to_120_chars(raw_ref),
kind,
});
}
log::warn!("{file_path} | {e:?}");
}
}
}
let elapsed = file_start.elapsed().as_secs_f64();
log_file_result(pb, show_progress, file_path, file_index, total_files, records_in_file, errors_in_file, elapsed);
Ok((records_in_file, file_stats))
}