1use super::error::{e_exit, e_println, ok_println};
2use memmap2::Mmap;
3use rayon::prelude::*;
4use std::fs::File;
5use std::io::{BufWriter, Write};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10pub struct ConvertCombine;
12
13const LARGE_FILE_THRESHOLD: u64 = 1_073_741_824; const CHUNK_SIZE: usize = 16 * 1024 * 1024; const PROGRESS_INTERVAL: Duration = Duration::from_secs(1); impl ConvertCombine {
18
19 pub fn combine_all(paths: Vec<PathBuf>, output: PathBuf) {
26 let output_file = match File::create(&output) {
28 Ok(f) => f,
29 Err(e) => e_exit("FILE_CREATE", &format!("Failed to create file: {}", e), 1),
30 };
31 let buffered_output = BufWriter::with_capacity(8 * 1024 * 1024, output_file);
32 let output = Arc::new(Mutex::new(buffered_output));
33
34 let total_processed = Arc::new(Mutex::new(0u64));
36
37 paths.par_iter().for_each(|path| {
38 let process_result = || -> std::io::Result<()> {
40 let file = File::open(path)?;
41 let file_size = file.metadata()?.len();
42
43 let mmap = unsafe { Mmap::map(&file)? };
45
46 if file_size > LARGE_FILE_THRESHOLD {
47 Self::process_large(&mmap, &output, &total_processed)
48 } else {
49 Self::process_small(&mmap, &output)
50 }
51 };
52
53 match process_result() {
54 Ok(_) => ok_println("Merge", &format!("{}", path.display())),
55 Err(e) => e_println("PROCESS_ERROR", &format!("Failed to process file [{}]: {}", path.display(), e)),
56 }
57 });
58
59 if let Ok(mut writer) = output.lock() {
61 let _ = writer.flush();
62 }
63
64 ok_println("MERGE_COMPLETE", "");
65 }
66
67 fn process_small(data: &Mmap, output: &Arc<Mutex<BufWriter<File>>>) -> std::io::Result<()> {
74 let mut writer = output.lock()
75 .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire lock"))?;
76
77 writer.write_all(data)?;
78 Ok(())
79 }
80
81 fn process_large(
89 data: &Mmap,
90 output: &Arc<Mutex<BufWriter<File>>>,
91 total_processed: &Arc<Mutex<u64>>
92 ) -> std::io::Result<()> {
93 let mut pos = 0;
94 let mut last_report_time = Instant::now();
95 let mut last_report_pos = 0;
96
97 while pos < data.len() {
98 let end = std::cmp::min(pos + CHUNK_SIZE, data.len());
100 let chunk = &data[pos..end];
101
102 {
104 let mut writer = output.lock()
105 .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire lock"))?;
106 writer.write_all(chunk)?;
107 }
108
109 pos = end;
110
111 let now = Instant::now();
113 if now.duration_since(last_report_time) >= PROGRESS_INTERVAL {
114 let bytes_written = pos - last_report_pos;
115 let elapsed = now.duration_since(last_report_time).as_secs_f64();
116 let mb_per_sec = (bytes_written as f64 / 1_048_576.0) / elapsed;
117
118 if let Ok(mut total) = total_processed.lock() {
120 *total += bytes_written as u64;
121 }
122
123 ok_println("PROGRESS", &format!("Processed {:.2} MB, Speed: {:.2} MB/s",
124 pos as f64 / 1_048_576.0, mb_per_sec));
125
126 last_report_time = now;
127 last_report_pos = pos;
128 }
129 }
130
131 Ok(())
132 }
133}