libfqcat/
lib.rs

1use flate2::read::MultiGzDecoder;
2use flate2::write::GzEncoder;
3use flate2::Compression;
4use futures::stream::FuturesUnordered;
5use futures::StreamExt;
6use glob::glob;
7use std::fs::{remove_file, File};
8use std::io::ErrorKind;
9use std::io::{self, BufRead, BufReader, BufWriter, Write};
10use std::path::PathBuf;
11use zstd::stream::write::Encoder;
12use zstd::Decoder;
13
14pub fn find_fastqs(search_dir: &String) -> Result<Vec<String>, io::Error> {
15    // Construct the search pattern
16    let pattern = if search_dir.ends_with('/') {
17        format!("{}*.fastq.gz", search_dir)
18    } else {
19        format!("{}/{}.fastq.gz", search_dir, "*")
20    };
21
22    // Initialize an empty vector to hold the paths
23    let mut fastq_files: Vec<String> = Vec::new();
24
25    // Use glob to search for files matching the pattern
26    for entry in glob(&pattern)
27        .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Glob pattern error: {}", e)))?
28    {
29        match entry {
30            Ok(path) => {
31                if path
32                    .file_name()
33                    .and_then(|os_str| os_str.to_str())
34                    .map(|file_base| !file_base.starts_with("._"))
35                    .unwrap_or(false)
36                {
37                    fastq_files.push(path.display().to_string());
38                }
39            }
40            Err(e) => {
41                return Err(io::Error::new(
42                    io::ErrorKind::InvalidData,
43                    format!("Error recording FASTA name:\n{}", e),
44                ));
45            }
46        }
47    }
48
49    // Check if any matching files were found
50    if fastq_files.is_empty() {
51        return Err(io::Error::new(ErrorKind::NotFound, "No FASTQ files found"));
52    }
53
54    Ok(fastq_files)
55}
56
57pub fn prepare_for_merges(
58    file_list: Vec<String>,
59    search_dir: &String,
60) -> Result<Vec<String>, io::Error> {
61    let mut new_files = Vec::with_capacity(file_list.len());
62    let mut new_file_name: String;
63
64    for (i, file) in file_list.into_iter().enumerate() {
65        if i % 2 != 0 {
66            new_files.push(file);
67            continue;
68        }
69        new_file_name = if search_dir.ends_with('/') {
70            format!("{}tmp{}.fastq.zst", search_dir, i)
71        } else {
72            format!("{}/tmp{}.fastq.zst", search_dir, i)
73        };
74        recode_gzip_to_zstd(&file, &new_file_name)?;
75        new_files.push(new_file_name);
76    }
77
78    Ok(new_files)
79}
80
81fn recode_gzip_to_zstd(input_path: &str, output_path: &str) -> io::Result<()> {
82    let file = File::open(input_path)?;
83
84    let reader = BufReader::new(MultiGzDecoder::new(file));
85
86    let output_file = std::fs::OpenOptions::new()
87        .create(true)
88        .append(true)
89        .open(output_path)?;
90
91    let mut writer = std::io::BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
92
93    for line in reader.lines() {
94        match line {
95            Ok(line_content) => {
96                writeln!(writer, "{}", line_content)?;
97            }
98            Err(e) => {
99                return Err(io::Error::new(
100                    io::ErrorKind::InvalidData,
101                    format!("Error reading line:\n{}", e),
102                ));
103            }
104        }
105    }
106
107    Ok(())
108}
109
110#[derive(Clone, Debug)]
111pub struct MergePair {
112    left_file: PathBuf,
113    right_file: PathBuf,
114}
115
116#[derive(Debug)]
117pub struct MergeTree {
118    level: usize,
119    post_merge_files: Vec<String>,
120    merge_pairs: Vec<MergePair>,
121    subtree: Option<Box<MergeTree>>,
122}
123
124pub fn build_merge_tree(
125    file_list: &Vec<String>,
126    level: Option<&usize>,
127) -> Result<MergeTree, io::Error> {
128    // handle the possibility that `level` was not specified
129    let previous_level = level.unwrap_or(&0);
130
131    // figure out how many files will be present after merging
132    let new_file_count: usize = if file_list.len() % 2 == 0 {
133        file_list.len() / 2
134    } else {
135        file_list.len() / 2 + 1
136    };
137
138    // allocate a vector of that length
139    let mut tmp_files: Vec<String> = Vec::with_capacity(new_file_count);
140
141    // construct a list of pairs with any remainders
142    let mut merge_pairs: Vec<MergePair> = Vec::with_capacity(new_file_count);
143    let mut iter = file_list.iter();
144    while let Some(left) = iter.next() {
145        if let Some(right) = iter.next() {
146            merge_pairs.push(MergePair {
147                left_file: PathBuf::from(left),
148                right_file: PathBuf::from(right),
149            });
150            tmp_files.push(left.to_string());
151        } else {
152            tmp_files.push(left.to_string());
153        }
154    }
155
156    // build a new tree for the provided files
157    let mut tree = MergeTree {
158        level: previous_level + 1,
159        post_merge_files: tmp_files,
160        merge_pairs,
161        subtree: None,
162    };
163
164    // if there are more than two new files, construct a subtree
165    if tree.post_merge_files.len() > 1 {
166        tree.subtree = Some(Box::new(build_merge_tree(
167            &tree.post_merge_files,
168            Some(&tree.level),
169        )?));
170    }
171
172    Ok(tree)
173}
174
175async fn merge_pair(pair: MergePair) -> io::Result<()> {
176    // convert the left file in the pair to a string
177    let left_file = pair
178        .left_file
179        .to_str()
180        .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Left file was None"))?;
181
182    // based on whether the left file is an intermediate file that has
183    // resulted from a previous merge, bind file paths so that the correct
184    // file is appended to
185    let (file1, file2) = if left_file.contains("tmp") {
186        (&pair.left_file, &pair.right_file)
187    } else {
188        (&pair.right_file, &pair.left_file)
189    };
190
191    println!("Appending {} onto {}", file2.display(), file1.display());
192
193    // open and buffer the file to be appended and report its extension
194    let file_to_append = File::open(file2)?;
195    let ext = file2.extension().expect(
196        "File is incorrectly named. Be sure the file \
197        contains an extension that indicates whether it is compressed.",
198    );
199
200    // decode the file to append
201    if ext == "zst" {
202        let decoder = zstd::Decoder::new(file_to_append)?;
203
204        // buffer the reader, pull out the lines, and define the batch
205        let read_buffer = BufReader::new(decoder);
206        let mut batch: Vec<String> = Vec::with_capacity(1000);
207
208        // Open or create the output file and create a zstd encoder for it
209        let output_file = std::fs::OpenOptions::new()
210            .create(true)
211            .append(true)
212            .open(file1)?;
213        let mut encoder = BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
214
215        for line in read_buffer.lines() {
216            let this_line = line?;
217            batch.push(this_line);
218
219            if batch.len() == 1000 {
220                for batch_line in &batch {
221                    writeln!(encoder, "{}", batch_line)?;
222                }
223                batch.clear();
224            }
225        }
226
227        if !&batch.is_empty() {
228            for batch_line in batch {
229                writeln!(encoder, "{}", batch_line)?;
230            }
231        }
232
233        println!("Merge successful; removing {:?}.", file2.display());
234        if file2.display().to_string().contains("tmp") {
235            remove_file(file2)?;
236        }
237    } else if ext == "gz" {
238        let decoder = MultiGzDecoder::new(file_to_append);
239
240        // buffer the reader, pull out the lines, and define the batch
241        let read_buffer = BufReader::new(decoder);
242        let mut batch: Vec<String> = Vec::with_capacity(1000);
243
244        // Open or create the output file and create a zstd encoder for it
245        let output_file = std::fs::OpenOptions::new()
246            .create(true)
247            .append(true)
248            .open(file1)?;
249        let mut encoder = BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
250
251        for line in read_buffer.lines() {
252            let this_line = line?;
253            batch.push(this_line);
254
255            if batch.len() == 1000 {
256                for batch_line in &batch {
257                    writeln!(encoder, "{}", batch_line)?;
258                }
259                batch.clear();
260            }
261        }
262
263        if !&batch.is_empty() {
264            for batch_line in batch {
265                writeln!(encoder, "{}", batch_line)?;
266            }
267        }
268
269        println!("Merge successful; removing {:?}.", file2.display());
270        if file2.display().to_string().contains("tmp") {
271            remove_file(file2)?;
272        }
273    } else {
274        // buffer the reader, pull out the lines, and define the batch
275        let read_buffer = BufReader::new(file_to_append);
276        let mut batch = Vec::with_capacity(1000);
277
278        // Open or create the output file and create a zstd encoder for it
279        let output_file = std::fs::OpenOptions::new()
280            .create(true)
281            .append(true)
282            .open(file1)?;
283        let mut encoder = BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
284
285        for line in read_buffer.lines() {
286            let this_line = line?;
287            batch.push(this_line);
288
289            if batch.len() == 1000 {
290                for batch_line in &batch {
291                    writeln!(encoder, "{}", batch_line)?;
292                }
293                batch.clear();
294            }
295        }
296
297        if !&batch.is_empty() {
298            for batch_line in batch {
299                writeln!(encoder, "{}", batch_line)?;
300            }
301        }
302
303        println!("Merge successful; removing {:?}.", file2.display());
304        if file2.display().to_string().contains("tmp") {
305            remove_file(file2)?;
306        }
307    }
308
309    Ok(())
310}
311
312#[tokio::main]
313async fn process_mergepairs(pairs: Vec<MergePair>, level: usize) -> io::Result<()> {
314    println!("Processing file pairs at level {} of the merge tree", level);
315
316    let mut futures = FuturesUnordered::new();
317
318    for pair in &pairs {
319        futures.push(merge_pair(pair.clone()));
320    }
321
322    while let Some(result) = futures.next().await {
323        match result {
324            Ok(_) => println!("Successfully appended, compressed, and cleaned {:?}", &pairs),
325            Err(e) => eprintln!(
326                "An error occurred when running merges in parallel:\n'{}',\nError occurred when awaiting merge completions.", e
327            ),
328        }
329    }
330
331    Ok(())
332}
333
334pub fn traverse_tree(tree: &MergeTree) -> io::Result<()> {
335    process_mergepairs(tree.merge_pairs.clone(), tree.level)?;
336
337    // Recur on the subtree if it exists
338    if let Some(ref subtree) = tree.subtree {
339        traverse_tree(subtree)?;
340    }
341
342    Ok(())
343}
344
345pub fn publish_final_fastq(readdir: &str, output_name: &str) -> io::Result<()> {
346    println!("Running final conversion.");
347
348    // handle the input
349    let input_path = format!("{}/tmp0.fastq.zst", readdir);
350    let open_input = File::open(&input_path)?;
351    let decoder = std::io::BufReader::new(Decoder::new(open_input)?);
352
353    // handle the output
354    let open_output = File::create(output_name)?;
355    let mut encoder = BufWriter::new(GzEncoder::new(open_output, Compression::default()));
356
357    // convert to final output
358    for line in decoder.lines() {
359        match line {
360            Ok(line_content) => {
361                writeln!(encoder, "{}", line_content)?;
362            }
363            Err(e) => {
364                return Err(io::Error::new(
365                    io::ErrorKind::InvalidData,
366                    format!("Line could not be written:\n{}", e),
367                ));
368            }
369        }
370    }
371
372    println!(
373        "Conversion complete; removing {}",
374        format_args!("{}/tmp0.fastq.zst", readdir)
375    );
376
377    // remove final tmp file
378    remove_file(format!("{}/tmp0.fastq.zst", readdir))?;
379
380    Ok(())
381}