1use std::fs::{self, File};
2use std::io::{BufRead, BufReader, BufWriter, Write};
3use std::path::{Path, PathBuf};
4
5use anyhow::{Context, Result, bail, ensure};
6
7#[derive(Debug, Clone)]
8pub struct CombineParams {
9 pub min_overlap: usize,
10 pub max_overlap: usize,
11 pub max_mismatch_density: f32,
12 pub cap_mismatch_quals: bool,
13 pub allow_outies: bool,
14 pub lowercase_overhang: bool,
15 pub phred_offset: u8,
16}
17
18impl Default for CombineParams {
19 fn default() -> Self {
20 Self {
21 min_overlap: 10,
22 max_overlap: 65,
23 max_mismatch_density: 0.25,
24 cap_mismatch_quals: false,
25 allow_outies: false,
26 lowercase_overhang: false,
27 phred_offset: 33,
28 }
29 }
30}
31
32impl CombineParams {
33 pub fn validate(&self) -> Result<()> {
34 ensure!(self.min_overlap >= 1, "min-overlap must be >= 1");
35 ensure!(self.max_overlap >= 1, "max-overlap must be >= 1");
36 ensure!(
37 self.max_overlap >= self.min_overlap,
38 "max-overlap ({}) cannot be less than min-overlap ({})",
39 self.max_overlap,
40 self.min_overlap
41 );
42 ensure!(
43 self.max_mismatch_density >= 0.0,
44 "max-mismatch-density must be non-negative"
45 );
46 ensure!(
47 self.phred_offset <= 127,
48 "phred-offset must be in the range [0, 127]"
49 );
50 Ok(())
51 }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum CombineStatus {
56 CombinedInnie,
57 CombinedOutie,
58}
59
60#[derive(Debug, Clone)]
61pub struct FastqRecord {
62 tag: String,
63 seq: Vec<u8>,
64 qual: Vec<u8>,
65}
66
67impl FastqRecord {
68 pub fn from_strs(tag: &str, seq: &str, qual: &str, phred_offset: u8) -> Result<Self> {
69 ensure!(
70 seq.len() == qual.len(),
71 "sequence and quality lengths differ"
72 );
73
74 let mut seq_bytes = Vec::with_capacity(seq.len());
75 for &base in seq.as_bytes() {
76 seq_bytes.push(canonical_base(base));
77 }
78
79 let mut qual_bytes = Vec::with_capacity(qual.len());
80 for (idx, &byte) in qual.as_bytes().iter().enumerate() {
81 if phred_offset > 0 && byte < phred_offset {
82 bail!(
83 "quality char below PHRED offset ({}) at position {}",
84 phred_offset,
85 idx
86 );
87 }
88 qual_bytes.push(byte.saturating_sub(phred_offset));
89 }
90
91 Ok(Self {
92 tag: tag.to_string(),
93 seq: seq_bytes,
94 qual: qual_bytes,
95 })
96 }
97
98 pub fn len(&self) -> usize {
99 self.seq.len()
100 }
101
102 pub fn tag(&self) -> &str {
103 &self.tag
104 }
105
106 pub fn set_tag<S: Into<String>>(&mut self, tag: S) {
107 self.tag = tag.into();
108 }
109
110 pub fn seq_bytes(&self) -> &[u8] {
111 &self.seq
112 }
113
114 pub fn qual_bytes(&self) -> &[u8] {
115 &self.qual
116 }
117
118 pub fn seq_string(&self) -> String {
119 String::from_utf8_lossy(&self.seq).into_owned()
120 }
121
122 pub fn qual_string(&self, phred_offset: u8) -> String {
123 let mut buf = Vec::with_capacity(self.qual.len());
124 for &q in &self.qual {
125 buf.push(q + phred_offset);
126 }
127 String::from_utf8_lossy(&buf).into_owned()
128 }
129}
130
131pub struct FastqPairReader {
132 forward: BufReader<File>,
133 reverse: BufReader<File>,
134 forward_path: PathBuf,
135 reverse_path: PathBuf,
136 phred_offset: u8,
137}
138
139impl FastqPairReader {
140 pub fn from_paths(
141 forward: impl AsRef<Path>,
142 reverse: impl AsRef<Path>,
143 phred_offset: u8,
144 ) -> Result<Self> {
145 let forward_path = forward.as_ref().to_path_buf();
146 let reverse_path = reverse.as_ref().to_path_buf();
147 let forward_file = File::open(&forward_path)
148 .with_context(|| format!("failed to open forward FASTQ {:?}", forward_path))?;
149 let reverse_file = File::open(&reverse_path)
150 .with_context(|| format!("failed to open reverse FASTQ {:?}", reverse_path))?;
151
152 Ok(Self {
153 forward: BufReader::new(forward_file),
154 reverse: BufReader::new(reverse_file),
155 forward_path,
156 reverse_path,
157 phred_offset,
158 })
159 }
160
161 pub fn next_pair(&mut self) -> Result<Option<(FastqRecord, FastqRecord)>> {
162 let read1 = read_fastq_record(&mut self.forward, &self.forward_path, self.phred_offset)?;
163 let read2 = read_fastq_record(&mut self.reverse, &self.reverse_path, self.phred_offset)?;
164
165 match (read1, read2) {
166 (Some(r1), Some(r2)) => Ok(Some((r1, r2))),
167 (None, None) => Ok(None),
168 (Some(_), None) | (None, Some(_)) => {
169 bail!("FASTQ inputs have different number of records")
170 }
171 }
172 }
173}
174
175struct AlignmentCandidate {
176 position: usize,
177 mismatch_density: f32,
178 qual_score: f32,
179}
180
181struct MismatchStats {
182 effective_len: usize,
183 mismatches: u32,
184 mismatch_qual_total: u32,
185}
186
187#[derive(Debug, Clone)]
188pub struct CombineOutcome {
189 pub is_combined: bool,
190 pub combined_tag: Option<String>,
191 pub combined_seq: Option<String>,
192 pub combined_qual: Option<String>,
193}
194
195pub fn combine_pair_from_strs(
196 tag1: &str,
197 seq1: &str,
198 qual1: &str,
199 tag2: &str,
200 seq2: &str,
201 qual2: &str,
202 params: &CombineParams,
203) -> Result<CombineOutcome> {
204 let read1 = FastqRecord::from_strs(tag1, seq1, qual1, params.phred_offset)?;
205 let read2 = FastqRecord::from_strs(tag2, seq2, qual2, params.phred_offset)?;
206
207 let mut read2_rev = read2.clone();
208 reverse_complement(&mut read2_rev);
209
210 if let Some(mut combined_record) = combine_pair(&read1, &read2_rev, params) {
211 let combined_tag = combined_tag(&read1);
212 combined_record.set_tag(combined_tag.clone());
213 Ok(CombineOutcome {
214 is_combined: true,
215 combined_tag: Some(combined_tag),
216 combined_seq: Some(combined_record.seq_string()),
217 combined_qual: Some(combined_record.qual_string(params.phred_offset)),
218 })
219 } else {
220 Ok(CombineOutcome {
221 is_combined: false,
222 combined_tag: None,
223 combined_seq: None,
224 combined_qual: None,
225 })
226 }
227}
228
229pub fn merge_fastq_files(
230 forward: impl AsRef<Path>,
231 reverse: impl AsRef<Path>,
232 output_dir: impl AsRef<Path>,
233 output_prefix: &str,
234 params: &CombineParams,
235) -> Result<()> {
236 params.validate()?;
237
238 let forward = forward.as_ref();
239 let reverse = reverse.as_ref();
240 let output_dir = output_dir.as_ref();
241
242 fs::create_dir_all(output_dir)
243 .with_context(|| format!("failed to create output directory {:?}", output_dir))?;
244
245 let mut pair_reader = FastqPairReader::from_paths(forward, reverse, params.phred_offset)?;
246
247 let ext_path = output_dir.join(format!("{}.extendedFrags.fastq", output_prefix));
248 let not1_path = output_dir.join(format!("{}.notCombined_1.fastq", output_prefix));
249 let not2_path = output_dir.join(format!("{}.notCombined_2.fastq", output_prefix));
250
251 let mut out_extended = BufWriter::new(
252 File::create(&ext_path).with_context(|| format!("failed to create {:?}", ext_path))?,
253 );
254 let mut out_not1 = BufWriter::new(
255 File::create(¬1_path).with_context(|| format!("failed to create {:?}", not1_path))?,
256 );
257 let mut out_not2 = BufWriter::new(
258 File::create(¬2_path).with_context(|| format!("failed to create {:?}", not2_path))?,
259 );
260
261 loop {
262 match pair_reader.next_pair()? {
263 Some((r1, r2)) => {
264 process_pair(
265 &r1,
266 &r2,
267 params,
268 &mut out_extended,
269 &mut out_not1,
270 &mut out_not2,
271 )?;
272 }
273 None => break,
274 }
275 }
276
277 out_extended
278 .flush()
279 .context("failed to flush extendedFrags writer")?;
280 out_not1
281 .flush()
282 .context("failed to flush notCombined_1 writer")?;
283 out_not2
284 .flush()
285 .context("failed to flush notCombined_2 writer")?;
286
287 Ok(())
288}
289
290fn process_pair<W: Write>(
291 read1: &FastqRecord,
292 read2: &FastqRecord,
293 params: &CombineParams,
294 out_extended: &mut W,
295 out_not1: &mut W,
296 out_not2: &mut W,
297) -> Result<()> {
298 let mut read2_rev = read2.clone();
299 reverse_complement(&mut read2_rev);
300
301 if let Some(mut combined) = combine_pair(read1, &read2_rev, params) {
302 combined.tag = combined_tag(read1);
303 write_fastq(out_extended, &combined, params.phred_offset)?;
304 } else {
305 write_fastq(out_not1, read1, params.phred_offset)?;
306 write_fastq(out_not2, read2, params.phred_offset)?;
307 }
308
309 Ok(())
310}
311
312fn read_fastq_record<R: BufRead>(
313 reader: &mut R,
314 source: &Path,
315 phred_offset: u8,
316) -> Result<Option<FastqRecord>> {
317 let mut tag_line = String::new();
318 if reader.read_line(&mut tag_line)? == 0 {
319 return Ok(None);
320 }
321
322 let mut seq_line = String::new();
323 if reader.read_line(&mut seq_line)? == 0 {
324 bail!("unexpected EOF reading sequence in {:?}", source);
325 }
326
327 let mut plus_line = String::new();
328 if reader.read_line(&mut plus_line)? == 0 {
329 bail!("unexpected EOF reading '+' separator in {:?}", source);
330 }
331
332 let mut qual_line = String::new();
333 if reader.read_line(&mut qual_line)? == 0 {
334 bail!("unexpected EOF reading quality in {:?}", source);
335 }
336
337 trim_newline(&mut tag_line);
338 trim_newline(&mut seq_line);
339 trim_newline(&mut plus_line);
340 trim_newline(&mut qual_line);
341
342 ensure!(
343 !tag_line.is_empty() && tag_line.starts_with('@'),
344 "invalid FASTQ tag line in {:?}: {}",
345 source,
346 tag_line
347 );
348 ensure!(
349 !plus_line.is_empty() && plus_line.starts_with('+'),
350 "invalid FASTQ '+' separator in {:?}: {}",
351 source,
352 plus_line
353 );
354 ensure!(
355 seq_line.len() == qual_line.len(),
356 "sequence and quality lengths differ in {:?}: {} vs {}",
357 source,
358 seq_line.len(),
359 qual_line.len()
360 );
361
362 if seq_line
363 .bytes()
364 .any(|b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
365 {
366 bail!("sequence line contains whitespace in {:?}", source);
367 }
368
369 let seq: Vec<u8> = seq_line.bytes().map(canonical_base).collect();
370 let mut qual = Vec::with_capacity(qual_line.len());
371 for (idx, byte) in qual_line.bytes().enumerate() {
372 if phred_offset > 0 && byte < phred_offset {
373 bail!(
374 "quality char below PHRED offset ({}) at position {} in {:?}",
375 phred_offset,
376 idx,
377 source
378 );
379 }
380 qual.push(byte.saturating_sub(phred_offset));
381 }
382
383 Ok(Some(FastqRecord {
384 tag: tag_line,
385 seq,
386 qual,
387 }))
388}
389
390fn trim_newline(line: &mut String) {
391 while matches!(line.chars().last(), Some('\n') | Some('\r')) {
392 line.pop();
393 }
394}
395
396fn canonical_base(b: u8) -> u8 {
397 match b {
398 b'A' | b'a' => b'A',
399 b'C' | b'c' => b'C',
400 b'G' | b'g' => b'G',
401 b'T' | b't' => b'T',
402 b'N' | b'n' => b'N',
403 _ => b'N',
404 }
405}
406
407fn complement(base: u8) -> u8 {
408 match base {
409 b'A' => b'T',
410 b'T' => b'A',
411 b'C' => b'G',
412 b'G' => b'C',
413 _ => b'N',
414 }
415}
416
417fn to_lower(base: u8) -> u8 {
418 if (b'A'..=b'Z').contains(&base) {
419 base + 32
420 } else {
421 base
422 }
423}
424
425fn reverse_complement(read: &mut FastqRecord) {
426 let len = read.seq.len();
427 for i in 0..(len / 2) {
428 let j = len - 1 - i;
429 let base_i = read.seq[i];
430 let base_j = read.seq[j];
431 read.seq[i] = complement(base_j);
432 read.seq[j] = complement(base_i);
433 let qual_i = read.qual[i];
434 read.qual[i] = read.qual[j];
435 read.qual[j] = qual_i;
436 }
437 if len % 2 == 1 {
438 let mid = len / 2;
439 read.seq[mid] = complement(read.seq[mid]);
440 }
441}
442
443fn combine_pair(
444 read1: &FastqRecord,
445 read2_rev: &FastqRecord,
446 params: &CombineParams,
447) -> Option<FastqRecord> {
448 let mut best: Option<(AlignmentCandidate, CombineStatus)> =
449 evaluate_alignment(read1, read2_rev, params)
450 .map(|candidate| (candidate, CombineStatus::CombinedInnie));
451
452 if params.allow_outies {
453 if let Some(candidate) = evaluate_alignment(read2_rev, read1, params) {
454 match &best {
455 None => best = Some((candidate, CombineStatus::CombinedOutie)),
456 Some((best_cand, _)) => {
457 if candidate.mismatch_density < best_cand.mismatch_density
458 || (candidate.mismatch_density == best_cand.mismatch_density
459 && candidate.qual_score < best_cand.qual_score)
460 {
461 best = Some((candidate, CombineStatus::CombinedOutie));
462 }
463 }
464 }
465 }
466 }
467
468 let (candidate, status) = best?;
469 let (first, second) = match status {
470 CombineStatus::CombinedInnie => (read1, read2_rev),
471 CombineStatus::CombinedOutie => (read2_rev, read1),
472 };
473 Some(generate_combined_read(
474 first,
475 second,
476 candidate.position,
477 params,
478 ))
479}
480
481fn evaluate_alignment(
482 first: &FastqRecord,
483 second: &FastqRecord,
484 params: &CombineParams,
485) -> Option<AlignmentCandidate> {
486 if first.len() < params.min_overlap {
487 return None;
488 }
489
490 let have_n = first.seq.contains(&b'N') || second.seq.contains(&b'N');
491 let mut best_density = params.max_mismatch_density + 1.0;
492 let mut best_qual_score = 0.0f32;
493 let mut best_position: Option<usize> = None;
494
495 let start = if first.len() > second.len() {
496 first.len() - second.len()
497 } else {
498 0
499 };
500 let end = first
501 .len()
502 .saturating_sub(params.min_overlap)
503 .saturating_add(1);
504
505 for i in start..end {
506 let overlap_len = first.len().saturating_sub(i);
507 if overlap_len > second.len() {
508 continue;
509 }
510
511 let stats = compute_mismatch_stats(
512 &first.seq[i..first.len()],
513 &second.seq[..overlap_len],
514 &first.qual[i..first.len()],
515 &second.qual[..overlap_len],
516 have_n,
517 );
518
519 if stats.effective_len >= params.min_overlap {
520 let score_len = (stats.effective_len.min(params.max_overlap)).max(1) as f32;
521 let mismatch_density = stats.mismatches as f32 / score_len;
522 let qual_score = stats.mismatch_qual_total as f32 / score_len;
523
524 if mismatch_density <= best_density
525 && (mismatch_density < best_density || qual_score < best_qual_score)
526 {
527 best_density = mismatch_density;
528 best_qual_score = qual_score;
529 best_position = Some(i);
530 }
531 }
532 }
533
534 let position = best_position?;
535 if best_density > params.max_mismatch_density {
536 return None;
537 }
538
539 Some(AlignmentCandidate {
540 position,
541 mismatch_density: best_density,
542 qual_score: best_qual_score,
543 })
544}
545
546fn compute_mismatch_stats(
547 seq1: &[u8],
548 seq2: &[u8],
549 qual1: &[u8],
550 qual2: &[u8],
551 have_n: bool,
552) -> MismatchStats {
553 let mut effective_len = seq1.len();
554 let mut mismatches: u32 = 0;
555 let mut mismatch_qual_total: u32 = 0;
556
557 if have_n {
558 for i in 0..seq1.len() {
559 if seq1[i] == b'N' || seq2[i] == b'N' {
560 effective_len -= 1;
561 } else if seq1[i] != seq2[i] {
562 mismatches += 1;
563 mismatch_qual_total += qual1[i].min(qual2[i]) as u32;
564 }
565 }
566 } else {
567 for i in 0..seq1.len() {
568 if seq1[i] != seq2[i] {
569 mismatches += 1;
570 mismatch_qual_total += qual1[i].min(qual2[i]) as u32;
571 }
572 }
573 }
574
575 MismatchStats {
576 effective_len,
577 mismatches,
578 mismatch_qual_total,
579 }
580}
581
582fn generate_combined_read(
583 read1: &FastqRecord,
584 read2: &FastqRecord,
585 overlap_begin: usize,
586 params: &CombineParams,
587) -> FastqRecord {
588 let overlap_len = read1.len() - overlap_begin;
589 let remaining_len = read2.len().saturating_sub(overlap_len);
590 let combined_len = read1.len() + remaining_len;
591
592 let mut seq = Vec::with_capacity(combined_len);
593 let mut qual = Vec::with_capacity(combined_len);
594
595 for idx in 0..overlap_begin {
596 let base = read1.seq[idx];
597 let base = if params.lowercase_overhang {
598 to_lower(base)
599 } else {
600 base
601 };
602 seq.push(base);
603 qual.push(read1.qual[idx]);
604 }
605
606 for offset in 0..overlap_len {
607 let base1 = read1.seq[overlap_begin + offset];
608 let base2 = read2.seq[offset];
609 let q1 = read1.qual[overlap_begin + offset];
610 let q2 = read2.qual[offset];
611
612 if base1 == base2 {
613 seq.push(base1);
614 qual.push(q1.max(q2));
615 } else {
616 let q = if params.cap_mismatch_quals {
617 q1.min(q2).min(2)
618 } else {
619 q1.abs_diff(q2).max(2)
620 };
621 let base = if q1 > q2 {
622 base1
623 } else if q2 > q1 {
624 base2
625 } else if base2 == b'N' {
626 base1
627 } else {
628 base2
629 };
630 seq.push(base);
631 qual.push(q);
632 }
633 }
634
635 for idx in overlap_len..read2.len() {
636 let base = read2.seq[idx];
637 let base = if params.lowercase_overhang {
638 to_lower(base)
639 } else {
640 base
641 };
642 seq.push(base);
643 qual.push(read2.qual[idx]);
644 }
645
646 FastqRecord {
647 tag: String::new(),
648 seq,
649 qual,
650 }
651}
652
653fn combined_tag(read1: &FastqRecord) -> String {
654 let tag = &read1.tag;
655 if let Some(pos) = tag.rfind('/') {
656 if pos + 2 < tag.len() && tag.as_bytes()[pos + 2] == b'#' {
657 format!("{}{}", &tag[..pos], &tag[pos + 2..])
658 } else {
659 tag[..pos].to_string()
660 }
661 } else {
662 tag.clone()
663 }
664}
665
666fn write_fastq<W: Write>(writer: &mut W, read: &FastqRecord, phred_offset: u8) -> Result<()> {
667 writer
668 .write_all(read.tag.as_bytes())
669 .context("failed to write FASTQ tag")?;
670 writer.write_all(b"\n").context("failed to write newline")?;
671 writer
672 .write_all(&read.seq)
673 .context("failed to write FASTQ sequence")?;
674 writer
675 .write_all(b"\n+\n")
676 .context("failed to write FASTQ separator")?;
677
678 let mut qual_buf = Vec::with_capacity(read.qual.len());
679 for &q in &read.qual {
680 qual_buf.push(q + phred_offset);
681 }
682 writer
683 .write_all(&qual_buf)
684 .context("failed to write FASTQ quality")?;
685 writer.write_all(b"\n").context("failed to write newline")?;
686 Ok(())
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692
693 #[test]
694 fn trim_newline_removes_crlf() {
695 let mut s = String::from("TAG\r\n");
696 trim_newline(&mut s);
697 assert_eq!(s, "TAG");
698 }
699
700 #[test]
701 fn canonical_base_normalises_cases_and_unknowns() {
702 assert_eq!(canonical_base(b'a'), b'A');
703 assert_eq!(canonical_base(b'T'), b'T');
704 assert_eq!(canonical_base(b'!'), b'N');
705 }
706}