use super::error_log::write_error_log;
use super::filter_processor::build_pipeline;
use super::input::{make_progress_bar, merge_trxid_prescan, resolve_input_files};
use super::parallel::process_csv_parallel;
use super::sequential::run_sequential;
use super::sqlite_parallel::process_sqlite_parallel;
use super::summary::print_run_summary;
use crate::config::Config;
use crate::error::{Error, ErrorStats, Result};
use crate::pipeline::{FIELD_NAMES, FieldMask, NormalizeConfig, OutputConfig, Pipeline};
use indicatif::ProgressBar;
use log::info;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
struct RunContext<'a> {
cfg: &'a Config,
pipeline: Pipeline,
field_mask: FieldMask,
ordered_indices: Vec<usize>,
do_normalize: bool,
placeholder_override: Option<bool>,
}
fn build_run_context(cfg: &Config) -> RunContext<'_> {
let pipeline = build_pipeline(cfg);
let field_mask = cfg
.output
.as_ref()
.map_or(FieldMask::ALL, OutputConfig::field_mask);
let ordered_indices = cfg.output.as_ref().map_or_else(
|| (0..FIELD_NAMES.len()).collect(),
OutputConfig::ordered_field_indices,
);
let do_normalize = field_mask.includes_normalized_sql()
&& cfg.replace_parameters.as_ref().is_none_or(|r| r.enable);
let placeholder_override = cfg
.replace_parameters
.as_ref()
.and_then(NormalizeConfig::placeholder_override);
RunContext {
cfg,
pipeline,
field_mask,
ordered_indices,
do_normalize,
placeholder_override,
}
}
type ProcessResult = Result<(Vec<(PathBuf, usize)>, usize, ErrorStats)>;
fn run_csv_parallel(
ctx: &RunContext<'_>,
log_files: &[PathBuf],
jobs: usize,
verbose: bool,
interrupted: &Arc<AtomicBool>,
) -> ProcessResult {
if verbose {
eprintln!(
"Processing {} files in parallel ({} jobs)",
log_files.len(),
jobs
);
}
info!("Parsing and exporting SQL logs (parallel, {jobs} jobs)...");
let (files, skipped, stats) = process_csv_parallel(
log_files,
ctx.cfg,
&ctx.pipeline,
jobs,
interrupted,
ctx.do_normalize,
ctx.placeholder_override,
ctx.field_mask,
&ctx.ordered_indices,
verbose,
)?;
Ok((files, skipped, stats))
}
fn run_sqlite_parallel(
ctx: &RunContext<'_>,
log_files: &[PathBuf],
jobs: usize,
verbose: bool,
interrupted: &Arc<AtomicBool>,
) -> ProcessResult {
if verbose {
eprintln!(
"Processing {} files in parallel ({} jobs)",
log_files.len(),
jobs
);
}
info!("Parsing and exporting SQL logs (SQLite parallel, {jobs} jobs)...");
let (files, skipped, stats) = process_sqlite_parallel(
log_files,
ctx.cfg,
&ctx.pipeline,
jobs,
interrupted,
ctx.do_normalize,
ctx.placeholder_override,
ctx.field_mask,
&ctx.ordered_indices,
)?;
Ok((files, skipped, stats))
}
fn route_processing(
ctx: &RunContext<'_>,
log_files: &[PathBuf],
jobs: usize,
is_stdin_pipe: bool,
verbose: bool,
quiet: bool,
pb: Option<&ProgressBar>,
interrupted: &Arc<AtomicBool>,
) -> ProcessResult {
let multi_file = jobs > 1 && log_files.len() > 1 && !is_stdin_pipe;
if multi_file && ctx.cfg.exporter.csv.is_some() {
return run_csv_parallel(ctx, log_files, jobs, verbose, interrupted);
}
if multi_file && ctx.cfg.exporter.sqlite.is_some() {
return run_sqlite_parallel(ctx, log_files, jobs, verbose, interrupted);
}
let (files, stats) = run_sequential(
log_files,
ctx.cfg,
&ctx.pipeline,
ctx.do_normalize,
ctx.placeholder_override,
verbose,
quiet,
pb.is_some(),
pb,
interrupted,
)?;
Ok((files, 0, stats))
}
fn finalize_run(
cfg: &Config,
run_stats: &mut ErrorStats,
processed_files: &[(PathBuf, usize)],
skipped_files: usize,
pb: Option<&ProgressBar>,
use_parallel: bool,
quiet: bool,
verbose: bool,
elapsed: f64,
) {
let total_records: usize = processed_files.iter().map(|(_, c)| *c).sum();
run_stats.records_exported = total_records;
if let Some(pb) = pb {
pb.finish_and_clear();
}
print_run_summary(
quiet,
verbose,
use_parallel,
elapsed,
processed_files,
total_records,
skipped_files,
run_stats,
);
write_error_log(cfg, run_stats);
}
pub fn handle_run(
cfg: &Config,
quiet: bool,
verbose: bool,
interrupted: &Arc<AtomicBool>,
jobs_override: Option<usize>,
) -> Result<ErrorStats> {
let total_start = Instant::now();
let mut run_stats = ErrorStats::default();
let (log_files, is_stdin_pipe) = resolve_input_files(cfg)?;
let jobs = jobs_override
.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, std::num::NonZero::get));
let merged = merge_trxid_prescan(cfg, &log_files, jobs, is_stdin_pipe, quiet)?;
let final_cfg: &Config = merged.as_ref().unwrap_or(cfg);
let ctx = build_run_context(final_cfg);
let use_parallel = (jobs > 1 && log_files.len() > 1 && !is_stdin_pipe)
&& (final_cfg.exporter.csv.is_some() || final_cfg.exporter.sqlite.is_some());
let show_progress = !quiet && !verbose && !use_parallel;
let pb = make_progress_bar(show_progress, log_files.len());
let (processed_files, skipped_files, stats) = route_processing(
&ctx,
&log_files,
jobs,
is_stdin_pipe,
verbose,
quiet,
pb.as_ref(),
interrupted,
)?;
run_stats.merge(&stats);
finalize_run(
final_cfg,
&mut run_stats,
&processed_files,
skipped_files,
pb.as_ref(),
use_parallel,
quiet,
verbose,
total_start.elapsed().as_secs_f64(),
);
if interrupted.load(Ordering::Acquire) {
return Err(Error::Interrupted);
}
Ok(run_stats)
}