1use flate2::read::MultiGzDecoder;
2use flate2::write::GzEncoder;
3use flate2::Compression;
4use futures::stream::FuturesUnordered;
5use futures::StreamExt;
6use glob::glob;
7use std::fs::{remove_file, File};
8use std::io::ErrorKind;
9use std::io::{self, BufRead, BufReader, BufWriter, Write};
10use std::path::PathBuf;
11use zstd::stream::write::Encoder;
12use zstd::Decoder;
13
14pub fn find_fastqs(search_dir: &String) -> Result<Vec<String>, io::Error> {
15 let pattern = if search_dir.ends_with('/') {
17 format!("{}*.fastq.gz", search_dir)
18 } else {
19 format!("{}/{}.fastq.gz", search_dir, "*")
20 };
21
22 let mut fastq_files: Vec<String> = Vec::new();
24
25 for entry in glob(&pattern)
27 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Glob pattern error: {}", e)))?
28 {
29 match entry {
30 Ok(path) => {
31 if path
32 .file_name()
33 .and_then(|os_str| os_str.to_str())
34 .map(|file_base| !file_base.starts_with("._"))
35 .unwrap_or(false)
36 {
37 fastq_files.push(path.display().to_string());
38 }
39 }
40 Err(e) => {
41 return Err(io::Error::new(
42 io::ErrorKind::InvalidData,
43 format!("Error recording FASTA name:\n{}", e),
44 ));
45 }
46 }
47 }
48
49 if fastq_files.is_empty() {
51 return Err(io::Error::new(ErrorKind::NotFound, "No FASTQ files found"));
52 }
53
54 Ok(fastq_files)
55}
56
57pub fn prepare_for_merges(
58 file_list: Vec<String>,
59 search_dir: &String,
60) -> Result<Vec<String>, io::Error> {
61 let mut new_files = Vec::with_capacity(file_list.len());
62 let mut new_file_name: String;
63
64 for (i, file) in file_list.into_iter().enumerate() {
65 if i % 2 != 0 {
66 new_files.push(file);
67 continue;
68 }
69 new_file_name = if search_dir.ends_with('/') {
70 format!("{}tmp{}.fastq.zst", search_dir, i)
71 } else {
72 format!("{}/tmp{}.fastq.zst", search_dir, i)
73 };
74 recode_gzip_to_zstd(&file, &new_file_name)?;
75 new_files.push(new_file_name);
76 }
77
78 Ok(new_files)
79}
80
81fn recode_gzip_to_zstd(input_path: &str, output_path: &str) -> io::Result<()> {
82 let file = File::open(input_path)?;
83
84 let reader = BufReader::new(MultiGzDecoder::new(file));
85
86 let output_file = std::fs::OpenOptions::new()
87 .create(true)
88 .append(true)
89 .open(output_path)?;
90
91 let mut writer = std::io::BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
92
93 for line in reader.lines() {
94 match line {
95 Ok(line_content) => {
96 writeln!(writer, "{}", line_content)?;
97 }
98 Err(e) => {
99 return Err(io::Error::new(
100 io::ErrorKind::InvalidData,
101 format!("Error reading line:\n{}", e),
102 ));
103 }
104 }
105 }
106
107 Ok(())
108}
109
110#[derive(Clone, Debug)]
111pub struct MergePair {
112 left_file: PathBuf,
113 right_file: PathBuf,
114}
115
116#[derive(Debug)]
117pub struct MergeTree {
118 level: usize,
119 post_merge_files: Vec<String>,
120 merge_pairs: Vec<MergePair>,
121 subtree: Option<Box<MergeTree>>,
122}
123
124pub fn build_merge_tree(
125 file_list: &Vec<String>,
126 level: Option<&usize>,
127) -> Result<MergeTree, io::Error> {
128 let previous_level = level.unwrap_or(&0);
130
131 let new_file_count: usize = if file_list.len() % 2 == 0 {
133 file_list.len() / 2
134 } else {
135 file_list.len() / 2 + 1
136 };
137
138 let mut tmp_files: Vec<String> = Vec::with_capacity(new_file_count);
140
141 let mut merge_pairs: Vec<MergePair> = Vec::with_capacity(new_file_count);
143 let mut iter = file_list.iter();
144 while let Some(left) = iter.next() {
145 if let Some(right) = iter.next() {
146 merge_pairs.push(MergePair {
147 left_file: PathBuf::from(left),
148 right_file: PathBuf::from(right),
149 });
150 tmp_files.push(left.to_string());
151 } else {
152 tmp_files.push(left.to_string());
153 }
154 }
155
156 let mut tree = MergeTree {
158 level: previous_level + 1,
159 post_merge_files: tmp_files,
160 merge_pairs,
161 subtree: None,
162 };
163
164 if tree.post_merge_files.len() > 1 {
166 tree.subtree = Some(Box::new(build_merge_tree(
167 &tree.post_merge_files,
168 Some(&tree.level),
169 )?));
170 }
171
172 Ok(tree)
173}
174
175async fn merge_pair(pair: MergePair) -> io::Result<()> {
176 let left_file = pair
178 .left_file
179 .to_str()
180 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Left file was None"))?;
181
182 let (file1, file2) = if left_file.contains("tmp") {
186 (&pair.left_file, &pair.right_file)
187 } else {
188 (&pair.right_file, &pair.left_file)
189 };
190
191 println!("Appending {} onto {}", file2.display(), file1.display());
192
193 let file_to_append = File::open(file2)?;
195 let ext = file2.extension().expect(
196 "File is incorrectly named. Be sure the file \
197 contains an extension that indicates whether it is compressed.",
198 );
199
200 if ext == "zst" {
202 let decoder = zstd::Decoder::new(file_to_append)?;
203
204 let read_buffer = BufReader::new(decoder);
206 let mut batch: Vec<String> = Vec::with_capacity(1000);
207
208 let output_file = std::fs::OpenOptions::new()
210 .create(true)
211 .append(true)
212 .open(file1)?;
213 let mut encoder = BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
214
215 for line in read_buffer.lines() {
216 let this_line = line?;
217 batch.push(this_line);
218
219 if batch.len() == 1000 {
220 for batch_line in &batch {
221 writeln!(encoder, "{}", batch_line)?;
222 }
223 batch.clear();
224 }
225 }
226
227 if !&batch.is_empty() {
228 for batch_line in batch {
229 writeln!(encoder, "{}", batch_line)?;
230 }
231 }
232
233 println!("Merge successful; removing {:?}.", file2.display());
234 if file2.display().to_string().contains("tmp") {
235 remove_file(file2)?;
236 }
237 } else if ext == "gz" {
238 let decoder = MultiGzDecoder::new(file_to_append);
239
240 let read_buffer = BufReader::new(decoder);
242 let mut batch: Vec<String> = Vec::with_capacity(1000);
243
244 let output_file = std::fs::OpenOptions::new()
246 .create(true)
247 .append(true)
248 .open(file1)?;
249 let mut encoder = BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
250
251 for line in read_buffer.lines() {
252 let this_line = line?;
253 batch.push(this_line);
254
255 if batch.len() == 1000 {
256 for batch_line in &batch {
257 writeln!(encoder, "{}", batch_line)?;
258 }
259 batch.clear();
260 }
261 }
262
263 if !&batch.is_empty() {
264 for batch_line in batch {
265 writeln!(encoder, "{}", batch_line)?;
266 }
267 }
268
269 println!("Merge successful; removing {:?}.", file2.display());
270 if file2.display().to_string().contains("tmp") {
271 remove_file(file2)?;
272 }
273 } else {
274 let read_buffer = BufReader::new(file_to_append);
276 let mut batch = Vec::with_capacity(1000);
277
278 let output_file = std::fs::OpenOptions::new()
280 .create(true)
281 .append(true)
282 .open(file1)?;
283 let mut encoder = BufWriter::new(Encoder::new(output_file, 3)?.auto_finish());
284
285 for line in read_buffer.lines() {
286 let this_line = line?;
287 batch.push(this_line);
288
289 if batch.len() == 1000 {
290 for batch_line in &batch {
291 writeln!(encoder, "{}", batch_line)?;
292 }
293 batch.clear();
294 }
295 }
296
297 if !&batch.is_empty() {
298 for batch_line in batch {
299 writeln!(encoder, "{}", batch_line)?;
300 }
301 }
302
303 println!("Merge successful; removing {:?}.", file2.display());
304 if file2.display().to_string().contains("tmp") {
305 remove_file(file2)?;
306 }
307 }
308
309 Ok(())
310}
311
312#[tokio::main]
313async fn process_mergepairs(pairs: Vec<MergePair>, level: usize) -> io::Result<()> {
314 println!("Processing file pairs at level {} of the merge tree", level);
315
316 let mut futures = FuturesUnordered::new();
317
318 for pair in &pairs {
319 futures.push(merge_pair(pair.clone()));
320 }
321
322 while let Some(result) = futures.next().await {
323 match result {
324 Ok(_) => println!("Successfully appended, compressed, and cleaned {:?}", &pairs),
325 Err(e) => eprintln!(
326 "An error occurred when running merges in parallel:\n'{}',\nError occurred when awaiting merge completions.", e
327 ),
328 }
329 }
330
331 Ok(())
332}
333
334pub fn traverse_tree(tree: &MergeTree) -> io::Result<()> {
335 process_mergepairs(tree.merge_pairs.clone(), tree.level)?;
336
337 if let Some(ref subtree) = tree.subtree {
339 traverse_tree(subtree)?;
340 }
341
342 Ok(())
343}
344
345pub fn publish_final_fastq(readdir: &str, output_name: &str) -> io::Result<()> {
346 println!("Running final conversion.");
347
348 let input_path = format!("{}/tmp0.fastq.zst", readdir);
350 let open_input = File::open(&input_path)?;
351 let decoder = std::io::BufReader::new(Decoder::new(open_input)?);
352
353 let open_output = File::create(output_name)?;
355 let mut encoder = BufWriter::new(GzEncoder::new(open_output, Compression::default()));
356
357 for line in decoder.lines() {
359 match line {
360 Ok(line_content) => {
361 writeln!(encoder, "{}", line_content)?;
362 }
363 Err(e) => {
364 return Err(io::Error::new(
365 io::ErrorKind::InvalidData,
366 format!("Line could not be written:\n{}", e),
367 ));
368 }
369 }
370 }
371
372 println!(
373 "Conversion complete; removing {}",
374 format_args!("{}/tmp0.fastq.zst", readdir)
375 );
376
377 remove_file(format!("{}/tmp0.fastq.zst", readdir))?;
379
380 Ok(())
381}