seq_here/
process.rs

1use super::error::{e_exit, e_println, ok_println};
2use memmap2::Mmap;
3use rayon::prelude::*;
4use std::fs::File;
5use std::io::{BufWriter, Write};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10/// Handles conversion and combining operations for files
11pub struct ConvertCombine;
12
13const LARGE_FILE_THRESHOLD: u64 = 1_073_741_824; // 1GB - threshold for using chunked processing
14const CHUNK_SIZE: usize = 16 * 1024 * 1024; // 16MB - size of each processing chunk
15const PROGRESS_INTERVAL: Duration = Duration::from_secs(1); // Report progress once per second
16
17impl ConvertCombine {
18
19    /// Combines multiple files into a single output file
20    /// 
21    /// # Arguments
22    /// 
23    /// * `paths` - Vector of paths to input files
24    /// * `output` - Path to the output file
25    pub fn combine_all(paths: Vec<PathBuf>, output: PathBuf) {
26        // Use BufWriter to improve write performance
27        let output_file = match File::create(&output) {
28            Ok(f) => f,
29            Err(e) => e_exit("FILE_CREATE", &format!("Failed to create file: {}", e), 1),
30        };
31        let buffered_output = BufWriter::with_capacity(8 * 1024 * 1024, output_file);
32        let output = Arc::new(Mutex::new(buffered_output));
33
34        // Add progress tracking
35        let total_processed = Arc::new(Mutex::new(0u64));
36
37        paths.par_iter().for_each(|path| {
38            // Result handling closure
39            let process_result = || -> std::io::Result<()> {
40                let file = File::open(path)?;
41                let file_size = file.metadata()?.len();
42
43                // Use memmap2
44                let mmap = unsafe { Mmap::map(&file)? };
45
46                if file_size > LARGE_FILE_THRESHOLD {
47                    Self::process_large(&mmap, &output, &total_processed)
48                } else {
49                    Self::process_small(&mmap, &output)
50                }
51            };
52
53            match process_result() {
54                Ok(_) => ok_println("Merge", &format!("{}", path.display())),
55                Err(e) => e_println("PROCESS_ERROR", &format!("Failed to process file [{}]: {}", path.display(), e)),
56            }
57        });
58
59        // Ensure all data is flushed to disk
60        if let Ok(mut writer) = output.lock() {
61            let _ = writer.flush();
62        }
63
64        ok_println("MERGE_COMPLETE", "");
65    }
66
67    /// Process a small file by writing it directly to the output
68    /// 
69    /// # Arguments
70    /// 
71    /// * `data` - Memory-mapped file data
72    /// * `output` - Shared output writer
73    fn process_small(data: &Mmap, output: &Arc<Mutex<BufWriter<File>>>) -> std::io::Result<()> {
74        let mut writer = output.lock()
75            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire lock"))?;
76
77        writer.write_all(data)?;
78        Ok(())
79    }
80
81    /// Process a large file in chunks with progress reporting
82    /// 
83    /// # Arguments
84    /// 
85    /// * `data` - Memory-mapped file data
86    /// * `output` - Shared output writer
87    /// * `total_processed` - Counter for total bytes processed
88    fn process_large(
89        data: &Mmap, 
90        output: &Arc<Mutex<BufWriter<File>>>,
91        total_processed: &Arc<Mutex<u64>>
92    ) -> std::io::Result<()> {
93        let mut pos = 0;
94        let mut last_report_time = Instant::now();
95        let mut last_report_pos = 0;
96
97        while pos < data.len() {
98            // Determine the end position for the current chunk
99            let end = std::cmp::min(pos + CHUNK_SIZE, data.len());
100            let chunk = &data[pos..end];
101
102            // Acquire lock and write chunk
103            {
104                let mut writer = output.lock()
105                    .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire lock"))?;
106                writer.write_all(chunk)?;
107            }
108
109            pos = end;
110            
111            // Reduce progress reporting frequency based on time interval rather than per chunk
112            let now = Instant::now();
113            if now.duration_since(last_report_time) >= PROGRESS_INTERVAL {
114                let bytes_written = pos - last_report_pos;
115                let elapsed = now.duration_since(last_report_time).as_secs_f64();
116                let mb_per_sec = (bytes_written as f64 / 1_048_576.0) / elapsed;
117                
118                // Update total processed bytes
119                if let Ok(mut total) = total_processed.lock() {
120                    *total += bytes_written as u64;
121                }
122                
123                ok_println("PROGRESS", &format!("Processed {:.2} MB, Speed: {:.2} MB/s", 
124                    pos as f64 / 1_048_576.0, mb_per_sec));
125                
126                last_report_time = now;
127                last_report_pos = pos;
128            }
129        }
130
131        Ok(())
132    }
133}