use crate::error::{Error, ErrorStats, Result};
use crate::exporter::ExporterManager;
use crate::pipeline::{FieldMask, Pipeline};
use dm_database_parser_sqllog::Sqllog;
use rayon::prelude::*;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
type ParseResults = Vec<Result<Option<(PathBuf, Vec<(Sqllog, Option<String>)>, ErrorStats)>>>;
fn run_parallel_parse(
log_files: &[PathBuf],
pipeline: &Pipeline,
jobs: usize,
do_normalize: bool,
placeholder_override: Option<bool>,
interrupted: &Arc<AtomicBool>,
) -> Result<ParseResults> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(jobs)
.build()
.map_err(|e| Error::Io(std::io::Error::other(e)))?;
Ok(pool.install(|| {
log_files
.par_iter()
.map(|file| {
if interrupted.load(Ordering::Acquire) {
return Ok(None);
}
let (rows, file_stats) = super::collector::collect_log_file(
file,
pipeline,
do_normalize,
placeholder_override,
interrupted,
)?;
Ok(Some((file.clone(), rows, file_stats)))
})
.collect()
}))
}
fn parallel_collect(
log_files: &[PathBuf],
pipeline: &Pipeline,
jobs: usize,
do_normalize: bool,
placeholder_override: Option<bool>,
interrupted: &Arc<AtomicBool>,
) -> Result<(
Vec<(PathBuf, Vec<(Sqllog, Option<String>)>)>,
usize,
ErrorStats,
)> {
let results = run_parallel_parse(
log_files,
pipeline,
jobs,
do_normalize,
placeholder_override,
interrupted,
)?;
let mut collected: Vec<(PathBuf, Vec<(Sqllog, Option<String>)>)> =
Vec::with_capacity(log_files.len());
let mut first_err: Option<Error> = None;
let mut skipped = 0usize;
let mut merged_stats = ErrorStats::default();
for result in results {
match result {
Ok(Some((path, rows, file_stats))) => {
merged_stats.merge(&file_stats);
collected.push((path, rows));
}
Ok(None) => skipped += 1,
Err(e) => {
log::warn!("parallel collect error: {e}");
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
if let Some(e) = first_err {
return Err(e);
}
Ok((collected, skipped, merged_stats))
}
#[allow(clippy::too_many_arguments)]
pub(super) fn process_sqlite_parallel(
log_files: &[PathBuf],
cfg: &crate::config::Config,
pipeline: &Pipeline,
jobs: usize,
interrupted: &Arc<AtomicBool>,
do_normalize: bool,
placeholder_override: Option<bool>,
_field_mask: FieldMask,
_ordered_indices: &[usize],
) -> Result<(Vec<(PathBuf, usize)>, usize, ErrorStats)> {
let (collected, skipped, parallel_stats) = parallel_collect(
log_files,
pipeline,
jobs,
do_normalize,
placeholder_override,
interrupted,
)?;
if parallel_stats.parse_errors > 0 {
log::warn!(
"SQLite parallel: {} parse error(s) across all files",
parallel_stats.parse_errors
);
}
let mut exporter_manager = ExporterManager::from_config(cfg)?;
exporter_manager.initialize()?;
exporter_manager.set_sqlite_wal_mode()?;
let mut per_file_counts: Vec<(PathBuf, usize)> = Vec::with_capacity(collected.len());
for (file_path, file_rows) in collected {
let count = file_rows.len();
for (record, normalized) in file_rows {
exporter_manager.export_one_preparsed(&record, true, normalized.as_deref())?;
}
per_file_counts.push((file_path, count));
}
exporter_manager.finalize()?;
Ok((per_file_counts, skipped, parallel_stats))
}