use crate::error::{Error, ErrorStats, Result};
use crate::exporter::{CsvExporter, ExporterManager};
use crate::pipeline::{FieldMask, Pipeline};
use dm_database_parser_sqllog::Sqllog;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use super::collector;
type TaskResult = Option<(PathBuf, PathBuf, usize, ErrorStats)>;
pub(super) fn concat_csv_parts(
parts: &[(PathBuf, usize)],
output_path: &Path,
overwrite: bool,
append_to_existing: bool,
) -> Result<()> {
use std::fs::OpenOptions;
use std::io::BufReader;
if parts.is_empty() {
return Ok(());
}
let file = if append_to_existing {
OpenOptions::new()
.create(true)
.append(true)
.open(output_path)?
} else {
OpenOptions::new()
.create_new(!overwrite)
.create(overwrite)
.write(true)
.truncate(overwrite)
.open(output_path)?
};
let mut writer = std::io::BufWriter::with_capacity(2 * 1024 * 1024, file);
let mut parts_to_remove: Vec<&Path> = Vec::with_capacity(parts.len());
for (idx, (part_path, _)) in parts.iter().enumerate() {
let part_file = std::fs::File::open(part_path)?;
let mut reader = BufReader::new(part_file);
let skip_header = idx > 0 || append_to_existing;
if skip_header {
let mut discard = Vec::with_capacity(256);
std::io::BufRead::read_until(&mut reader, b'\n', &mut discard)?;
}
std::io::copy(&mut reader, &mut writer)?;
parts_to_remove.push(part_path.as_path());
}
use std::io::Write as _;
writer.flush()?;
for p in parts_to_remove {
if let Err(e) = std::fs::remove_file(p) {
log::warn!("failed to remove temp part {}: {e}", p.display());
}
}
Ok(())
}
fn setup_parts_dir(output_path: &Path) -> Result<PathBuf> {
let stem = output_path
.file_stem()
.unwrap_or_default()
.to_string_lossy();
let dir_name = format!(".{stem}_parts_{}", std::process::id());
let preferred = output_path
.parent()
.filter(|p| !p.as_os_str().is_empty())
.unwrap_or(Path::new("."));
let candidate = preferred.join(&dir_name);
if std::fs::create_dir_all(&candidate).is_ok() {
Ok(candidate)
} else {
let fallback = std::env::temp_dir().join(&dir_name);
std::fs::create_dir_all(&fallback)?;
Ok(fallback)
}
}
fn write_records_to_csv(
rows: Vec<(Sqllog, Option<String>)>,
temp_path: &Path,
include_performance_metrics: bool,
do_normalize: bool,
field_mask: FieldMask,
ordered_indices: &[usize],
) -> Result<usize> {
let mut exporter = CsvExporter::new(temp_path);
exporter.normalize = do_normalize;
exporter.field_mask = field_mask;
exporter.ordered_indices = ordered_indices.to_vec();
exporter.include_performance_metrics = include_performance_metrics;
let mut em = ExporterManager::from_csv(exporter);
em.initialize()?;
let include_pm = em.csv_include_performance_metrics();
let count = rows.len();
for (record, normalized) in rows {
em.export_one_preparsed(&record, include_pm, normalized.as_deref())?;
}
em.finalize()?;
Ok(count)
}
#[allow(clippy::too_many_arguments)]
fn run_parallel_tasks(
log_files: &[PathBuf],
csv_include_performance_metrics: bool,
pipeline: &Pipeline,
jobs: usize,
interrupted: &Arc<AtomicBool>,
do_normalize: bool,
placeholder_override: Option<bool>,
field_mask: FieldMask,
ordered_indices: &[usize],
parts_dir: &Path,
verbose: bool,
) -> Result<Vec<Result<TaskResult>>> {
use rayon::prelude::*;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(jobs)
.build()
.map_err(|e| Error::Io(std::io::Error::other(e)))?;
let results: Vec<Result<TaskResult>> = pool.install(|| {
log_files
.par_iter()
.enumerate()
.map(|(idx, file)| {
if interrupted.load(Ordering::Acquire) {
return Ok(None);
}
verbose.then(|| eprintln!("Processing: {}", file.display()));
let temp_path = parts_dir.join(format!("{idx:08}.csv"));
let (rows, file_stats) = collector::collect_log_file(
file,
pipeline,
do_normalize,
placeholder_override,
interrupted,
)?;
let count = write_records_to_csv(
rows,
&temp_path,
csv_include_performance_metrics,
do_normalize,
field_mask,
ordered_indices,
)?;
Ok(Some((file.clone(), temp_path, count, file_stats)))
})
.collect()
});
Ok(results)
}
fn collect_parallel_results(
results: Vec<Result<TaskResult>>,
) -> Result<(Vec<(PathBuf, PathBuf, usize)>, ErrorStats, usize)> {
let mut parts_info: Vec<(PathBuf, PathBuf, usize)> = Vec::with_capacity(results.len());
let mut parallel_stats = ErrorStats::default();
let mut first_err: Option<Error> = None;
let mut skipped = 0usize;
for result in results {
match result {
Ok(Some((orig, temp, count, file_stats))) => {
parallel_stats.merge(&file_stats);
parts_info.push((orig, temp, count));
}
Ok(None) => skipped += 1,
Err(e) => {
log::warn!("parallel collect error: {e}");
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
if let Some(e) = first_err {
for (_, temp, _) in &parts_info {
let _ = std::fs::remove_file(temp);
}
return Err(e);
}
Ok((parts_info, parallel_stats, skipped))
}
fn finalize_concat(
parts_info: Vec<(PathBuf, PathBuf, usize)>,
output_path: &Path,
overwrite: bool,
append_to_existing: bool,
parts_dir: &Path,
skipped: usize,
parallel_stats: ErrorStats,
) -> Result<(Vec<(PathBuf, usize)>, usize, ErrorStats)> {
let parts_for_concat: Vec<(PathBuf, usize)> = parts_info
.iter()
.map(|(_, temp, count)| (temp.clone(), *count))
.collect();
let concat_result = concat_csv_parts(
&parts_for_concat,
output_path,
overwrite,
append_to_existing,
);
let _ = std::fs::remove_dir_all(parts_dir);
if concat_result.is_err() && !append_to_existing {
let _ = std::fs::remove_file(output_path);
}
concat_result?;
Ok((
parts_info
.into_iter()
.map(|(orig, _, count)| (orig, count))
.collect(),
skipped,
parallel_stats,
))
}
#[allow(clippy::too_many_arguments)]
pub(super) fn process_csv_parallel(
log_files: &[PathBuf],
cfg: &crate::config::Config,
pipeline: &Pipeline,
jobs: usize,
interrupted: &Arc<AtomicBool>,
do_normalize: bool,
placeholder_override: Option<bool>,
field_mask: FieldMask,
ordered_indices: &[usize],
verbose: bool,
) -> Result<(Vec<(PathBuf, usize)>, usize, ErrorStats)> {
let csv_cfg = cfg.exporter.csv.as_ref().ok_or_else(|| {
Error::Export(crate::error::ExportError::WriteFailed {
path: std::path::PathBuf::from("<csv>"),
reason: "parallel CSV path requires CSV exporter to be configured".into(),
})
})?;
let output_path = Path::new(&csv_cfg.file);
let append_to_existing = csv_cfg.append && output_path.exists();
let parts_dir = setup_parts_dir(output_path)?;
let results = run_parallel_tasks(
log_files,
csv_cfg.include_performance_metrics,
pipeline,
jobs,
interrupted,
do_normalize,
placeholder_override,
field_mask,
ordered_indices,
&parts_dir,
verbose,
)?;
let (parts_info, parallel_stats, skipped) = match collect_parallel_results(results) {
Ok(v) => v,
Err(e) => {
let _ = std::fs::remove_dir_all(&parts_dir);
return Err(e);
}
};
finalize_concat(
parts_info,
output_path,
csv_cfg.overwrite,
append_to_existing,
&parts_dir,
skipped,
parallel_stats,
)
}