use super::error::{e_exit, e_println, ok_println};
use memmap2::Mmap;
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub struct ConvertCombine;
const LARGE_FILE_THRESHOLD: u64 = 1_073_741_824; const CHUNK_SIZE: usize = 16 * 1024 * 1024; const PROGRESS_INTERVAL: Duration = Duration::from_secs(1);
impl ConvertCombine {
pub fn combine_all(paths: Vec<PathBuf>, output: PathBuf) {
let output_file = match File::create(&output) {
Ok(f) => f,
Err(e) => e_exit("FILE_CREATE", &format!("Failed to create file: {}", e), 1),
};
let buffered_output = BufWriter::with_capacity(8 * 1024 * 1024, output_file);
let output = Arc::new(Mutex::new(buffered_output));
let total_processed = Arc::new(Mutex::new(0u64));
paths.par_iter().for_each(|path| {
let process_result = || -> std::io::Result<()> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
let mmap = unsafe { Mmap::map(&file)? };
if file_size > LARGE_FILE_THRESHOLD {
Self::process_large(&mmap, &output, &total_processed)
} else {
Self::process_small(&mmap, &output)
}
};
match process_result() {
Ok(_) => ok_println("Merge", &format!("{}", path.display())),
Err(e) => e_println("PROCESS_ERROR", &format!("Failed to process file [{}]: {}", path.display(), e)),
}
});
if let Ok(mut writer) = output.lock() {
let _ = writer.flush();
}
ok_println("MERGE_COMPLETE", "");
}
fn process_small(data: &Mmap, output: &Arc<Mutex<BufWriter<File>>>) -> std::io::Result<()> {
let mut writer = output.lock()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire lock"))?;
writer.write_all(data)?;
Ok(())
}
fn process_large(
data: &Mmap,
output: &Arc<Mutex<BufWriter<File>>>,
total_processed: &Arc<Mutex<u64>>
) -> std::io::Result<()> {
let mut pos = 0;
let mut last_report_time = Instant::now();
let mut last_report_pos = 0;
while pos < data.len() {
let end = std::cmp::min(pos + CHUNK_SIZE, data.len());
let chunk = &data[pos..end];
{
let mut writer = output.lock()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire lock"))?;
writer.write_all(chunk)?;
}
pos = end;
let now = Instant::now();
if now.duration_since(last_report_time) >= PROGRESS_INTERVAL {
let bytes_written = pos - last_report_pos;
let elapsed = now.duration_since(last_report_time).as_secs_f64();
let mb_per_sec = (bytes_written as f64 / 1_048_576.0) / elapsed;
if let Ok(mut total) = total_processed.lock() {
*total += bytes_written as u64;
}
ok_println("PROGRESS", &format!("Processed {:.2} MB, Speed: {:.2} MB/s",
pos as f64 / 1_048_576.0, mb_per_sec));
last_report_time = now;
last_report_pos = pos;
}
}
Ok(())
}
}