#[cfg(feature = "parallel")]
use rayon::prelude::*;
use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use crate::Adt;
use crate::AdtVersion;
use crate::error::Result;
#[allow(dead_code)]
type ProcessCallback<T> = fn(&Adt, &Path) -> Result<T>;
#[derive(Debug, Clone, Default)]
pub struct ParallelOptions {
pub max_threads: usize,
pub continue_on_error: bool,
pub use_mmap: bool,
}
#[cfg(feature = "parallel")]
pub fn process_parallel<T, F>(
files: &[PathBuf],
processor: F,
options: &ParallelOptions,
) -> Result<Vec<(PathBuf, Result<T>)>>
where
F: Fn(&Adt, &Path) -> Result<T> + Send + Sync,
T: Send,
{
if options.max_threads > 0 {
rayon::ThreadPoolBuilder::new()
.num_threads(options.max_threads)
.build_global()
.map_err(|e| {
crate::error::AdtError::ParseError(format!("Failed to configure thread pool: {e}"))
})?;
}
let results: Vec<(PathBuf, Result<T>)> = files
.par_iter()
.map(|file_path| {
let result = process_single_file(file_path, &processor, options);
(file_path.clone(), result)
})
.collect();
Ok(results)
}
#[cfg(not(feature = "parallel"))]
pub fn process_parallel<T, F>(
files: &[PathBuf],
processor: F,
options: &ParallelOptions,
) -> Result<Vec<(PathBuf, Result<T>)>>
where
F: Fn(&Adt, &Path) -> Result<T>,
{
let mut results = Vec::with_capacity(files.len());
for file_path in files {
let result = process_single_file(file_path, &processor, options);
results.push((file_path.clone(), result));
}
Ok(results)
}
fn process_single_file<T, F>(
file_path: &Path,
processor: &F,
options: &ParallelOptions,
) -> Result<T>
where
F: Fn(&Adt, &Path) -> Result<T>,
{
let adt = if options.use_mmap {
#[cfg(feature = "mmap")]
{
load_adt_mmap(file_path)?
}
#[cfg(not(feature = "mmap"))]
{
Adt::from_path(file_path)?
}
} else {
Adt::from_path(file_path)?
};
processor(&adt, file_path)
}
#[cfg(feature = "mmap")]
fn load_adt_mmap(path: &Path) -> Result<Adt> {
use memmap2::Mmap;
use std::io::Cursor;
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
let mut cursor = Cursor::new(&mmap[..]);
Adt::from_reader(&mut cursor)
}
pub fn batch_convert(
input_files: &[PathBuf],
output_dir: &Path,
target_version: AdtVersion,
options: &ParallelOptions,
) -> Result<Vec<(PathBuf, Result<()>)>> {
let processor = move |adt: &Adt, input_path: &Path| {
let converted = adt.to_version(target_version)?;
let file_name = input_path.file_name().unwrap();
let output_path = output_dir.join(file_name);
if let Some(parent) = output_path.parent() {
if !parent.exists() {
std::fs::create_dir_all(parent)?;
}
}
let output_file = File::create(&output_path)?;
let mut writer = BufWriter::new(output_file);
converted.write(&mut writer)?;
Ok(())
};
process_parallel(input_files, processor, options)
}
pub fn batch_validate(
input_files: &[PathBuf],
output_dir: &Path,
validation_level: crate::validator::ValidationLevel,
options: &ParallelOptions,
) -> Result<Vec<(PathBuf, Result<crate::validator::ValidationReport>)>> {
let processor = move |adt: &Adt, input_path: &Path| {
let report = adt.validate_with_report(validation_level)?;
if !output_dir.as_os_str().is_empty() {
let file_name = input_path.file_name().unwrap();
let mut output_path = output_dir.join(file_name);
output_path.set_extension("txt");
if let Some(parent) = output_path.parent() {
if !parent.exists() {
std::fs::create_dir_all(parent)?;
}
}
let mut file = File::create(&output_path)?;
use std::io::Write;
writeln!(file, "Validation Report for {}", input_path.display())?;
writeln!(file, "{}", report.format())?;
}
Ok(report)
};
process_parallel(input_files, processor, options)
}