flash_lib/
lib.rs

1use std::fs::{self, File};
2use std::io::{BufRead, BufReader, BufWriter, Write};
3use std::path::{Path, PathBuf};
4
5use anyhow::{Context, Result, bail, ensure};
6
7#[derive(Debug, Clone)]
8pub struct CombineParams {
9    pub min_overlap: usize,
10    pub max_overlap: usize,
11    pub max_mismatch_density: f32,
12    pub cap_mismatch_quals: bool,
13    pub allow_outies: bool,
14    pub lowercase_overhang: bool,
15    pub phred_offset: u8,
16}
17
18impl Default for CombineParams {
19    fn default() -> Self {
20        Self {
21            min_overlap: 10,
22            max_overlap: 65,
23            max_mismatch_density: 0.25,
24            cap_mismatch_quals: false,
25            allow_outies: false,
26            lowercase_overhang: false,
27            phred_offset: 33,
28        }
29    }
30}
31
32impl CombineParams {
33    pub fn validate(&self) -> Result<()> {
34        ensure!(self.min_overlap >= 1, "min-overlap must be >= 1");
35        ensure!(self.max_overlap >= 1, "max-overlap must be >= 1");
36        ensure!(
37            self.max_overlap >= self.min_overlap,
38            "max-overlap ({}) cannot be less than min-overlap ({})",
39            self.max_overlap,
40            self.min_overlap
41        );
42        ensure!(
43            self.max_mismatch_density >= 0.0,
44            "max-mismatch-density must be non-negative"
45        );
46        ensure!(
47            self.phred_offset <= 127,
48            "phred-offset must be in the range [0, 127]"
49        );
50        Ok(())
51    }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum CombineStatus {
56    CombinedInnie,
57    CombinedOutie,
58}
59
60#[derive(Debug, Clone)]
61pub struct FastqRecord {
62    tag: String,
63    seq: Vec<u8>,
64    qual: Vec<u8>,
65}
66
67impl FastqRecord {
68    pub fn from_strs(tag: &str, seq: &str, qual: &str, phred_offset: u8) -> Result<Self> {
69        ensure!(
70            seq.len() == qual.len(),
71            "sequence and quality lengths differ"
72        );
73
74        let mut seq_bytes = Vec::with_capacity(seq.len());
75        for &base in seq.as_bytes() {
76            seq_bytes.push(canonical_base(base));
77        }
78
79        let mut qual_bytes = Vec::with_capacity(qual.len());
80        for (idx, &byte) in qual.as_bytes().iter().enumerate() {
81            if phred_offset > 0 && byte < phred_offset {
82                bail!(
83                    "quality char below PHRED offset ({}) at position {}",
84                    phred_offset,
85                    idx
86                );
87            }
88            qual_bytes.push(byte.saturating_sub(phred_offset));
89        }
90
91        Ok(Self {
92            tag: tag.to_string(),
93            seq: seq_bytes,
94            qual: qual_bytes,
95        })
96    }
97
98    pub fn len(&self) -> usize {
99        self.seq.len()
100    }
101
102    pub fn tag(&self) -> &str {
103        &self.tag
104    }
105
106    pub fn set_tag<S: Into<String>>(&mut self, tag: S) {
107        self.tag = tag.into();
108    }
109
110    pub fn seq_bytes(&self) -> &[u8] {
111        &self.seq
112    }
113
114    pub fn qual_bytes(&self) -> &[u8] {
115        &self.qual
116    }
117
118    pub fn seq_string(&self) -> String {
119        String::from_utf8_lossy(&self.seq).into_owned()
120    }
121
122    pub fn qual_string(&self, phred_offset: u8) -> String {
123        let mut buf = Vec::with_capacity(self.qual.len());
124        for &q in &self.qual {
125            buf.push(q + phred_offset);
126        }
127        String::from_utf8_lossy(&buf).into_owned()
128    }
129}
130
131pub struct FastqPairReader {
132    forward: BufReader<File>,
133    reverse: BufReader<File>,
134    forward_path: PathBuf,
135    reverse_path: PathBuf,
136    phred_offset: u8,
137}
138
139impl FastqPairReader {
140    pub fn from_paths(
141        forward: impl AsRef<Path>,
142        reverse: impl AsRef<Path>,
143        phred_offset: u8,
144    ) -> Result<Self> {
145        let forward_path = forward.as_ref().to_path_buf();
146        let reverse_path = reverse.as_ref().to_path_buf();
147        let forward_file = File::open(&forward_path)
148            .with_context(|| format!("failed to open forward FASTQ {:?}", forward_path))?;
149        let reverse_file = File::open(&reverse_path)
150            .with_context(|| format!("failed to open reverse FASTQ {:?}", reverse_path))?;
151
152        Ok(Self {
153            forward: BufReader::new(forward_file),
154            reverse: BufReader::new(reverse_file),
155            forward_path,
156            reverse_path,
157            phred_offset,
158        })
159    }
160
161    pub fn next_pair(&mut self) -> Result<Option<(FastqRecord, FastqRecord)>> {
162        let read1 = read_fastq_record(&mut self.forward, &self.forward_path, self.phred_offset)?;
163        let read2 = read_fastq_record(&mut self.reverse, &self.reverse_path, self.phred_offset)?;
164
165        match (read1, read2) {
166            (Some(r1), Some(r2)) => Ok(Some((r1, r2))),
167            (None, None) => Ok(None),
168            (Some(_), None) | (None, Some(_)) => {
169                bail!("FASTQ inputs have different number of records")
170            }
171        }
172    }
173}
174
175struct AlignmentCandidate {
176    position: usize,
177    mismatch_density: f32,
178    qual_score: f32,
179}
180
181struct MismatchStats {
182    effective_len: usize,
183    mismatches: u32,
184    mismatch_qual_total: u32,
185}
186
187#[derive(Debug, Clone)]
188pub struct CombineOutcome {
189    pub is_combined: bool,
190    pub combined_tag: Option<String>,
191    pub combined_seq: Option<String>,
192    pub combined_qual: Option<String>,
193}
194
195pub fn combine_pair_from_strs(
196    tag1: &str,
197    seq1: &str,
198    qual1: &str,
199    tag2: &str,
200    seq2: &str,
201    qual2: &str,
202    params: &CombineParams,
203) -> Result<CombineOutcome> {
204    let read1 = FastqRecord::from_strs(tag1, seq1, qual1, params.phred_offset)?;
205    let read2 = FastqRecord::from_strs(tag2, seq2, qual2, params.phred_offset)?;
206
207    let mut read2_rev = read2.clone();
208    reverse_complement(&mut read2_rev);
209
210    if let Some(mut combined_record) = combine_pair(&read1, &read2_rev, params) {
211        let combined_tag = combined_tag(&read1);
212        combined_record.set_tag(combined_tag.clone());
213        Ok(CombineOutcome {
214            is_combined: true,
215            combined_tag: Some(combined_tag),
216            combined_seq: Some(combined_record.seq_string()),
217            combined_qual: Some(combined_record.qual_string(params.phred_offset)),
218        })
219    } else {
220        Ok(CombineOutcome {
221            is_combined: false,
222            combined_tag: None,
223            combined_seq: None,
224            combined_qual: None,
225        })
226    }
227}
228
229pub fn merge_fastq_files(
230    forward: impl AsRef<Path>,
231    reverse: impl AsRef<Path>,
232    output_dir: impl AsRef<Path>,
233    output_prefix: &str,
234    params: &CombineParams,
235) -> Result<()> {
236    params.validate()?;
237
238    let forward = forward.as_ref();
239    let reverse = reverse.as_ref();
240    let output_dir = output_dir.as_ref();
241
242    fs::create_dir_all(output_dir)
243        .with_context(|| format!("failed to create output directory {:?}", output_dir))?;
244
245    let mut pair_reader = FastqPairReader::from_paths(forward, reverse, params.phred_offset)?;
246
247    let ext_path = output_dir.join(format!("{}.extendedFrags.fastq", output_prefix));
248    let not1_path = output_dir.join(format!("{}.notCombined_1.fastq", output_prefix));
249    let not2_path = output_dir.join(format!("{}.notCombined_2.fastq", output_prefix));
250
251    let mut out_extended = BufWriter::new(
252        File::create(&ext_path).with_context(|| format!("failed to create {:?}", ext_path))?,
253    );
254    let mut out_not1 = BufWriter::new(
255        File::create(&not1_path).with_context(|| format!("failed to create {:?}", not1_path))?,
256    );
257    let mut out_not2 = BufWriter::new(
258        File::create(&not2_path).with_context(|| format!("failed to create {:?}", not2_path))?,
259    );
260
261    loop {
262        match pair_reader.next_pair()? {
263            Some((r1, r2)) => {
264                process_pair(
265                    &r1,
266                    &r2,
267                    params,
268                    &mut out_extended,
269                    &mut out_not1,
270                    &mut out_not2,
271                )?;
272            }
273            None => break,
274        }
275    }
276
277    out_extended
278        .flush()
279        .context("failed to flush extendedFrags writer")?;
280    out_not1
281        .flush()
282        .context("failed to flush notCombined_1 writer")?;
283    out_not2
284        .flush()
285        .context("failed to flush notCombined_2 writer")?;
286
287    Ok(())
288}
289
290fn process_pair<W: Write>(
291    read1: &FastqRecord,
292    read2: &FastqRecord,
293    params: &CombineParams,
294    out_extended: &mut W,
295    out_not1: &mut W,
296    out_not2: &mut W,
297) -> Result<()> {
298    let mut read2_rev = read2.clone();
299    reverse_complement(&mut read2_rev);
300
301    if let Some(mut combined) = combine_pair(read1, &read2_rev, params) {
302        combined.tag = combined_tag(read1);
303        write_fastq(out_extended, &combined, params.phred_offset)?;
304    } else {
305        write_fastq(out_not1, read1, params.phred_offset)?;
306        write_fastq(out_not2, read2, params.phred_offset)?;
307    }
308
309    Ok(())
310}
311
312fn read_fastq_record<R: BufRead>(
313    reader: &mut R,
314    source: &Path,
315    phred_offset: u8,
316) -> Result<Option<FastqRecord>> {
317    let mut tag_line = String::new();
318    if reader.read_line(&mut tag_line)? == 0 {
319        return Ok(None);
320    }
321
322    let mut seq_line = String::new();
323    if reader.read_line(&mut seq_line)? == 0 {
324        bail!("unexpected EOF reading sequence in {:?}", source);
325    }
326
327    let mut plus_line = String::new();
328    if reader.read_line(&mut plus_line)? == 0 {
329        bail!("unexpected EOF reading '+' separator in {:?}", source);
330    }
331
332    let mut qual_line = String::new();
333    if reader.read_line(&mut qual_line)? == 0 {
334        bail!("unexpected EOF reading quality in {:?}", source);
335    }
336
337    trim_newline(&mut tag_line);
338    trim_newline(&mut seq_line);
339    trim_newline(&mut plus_line);
340    trim_newline(&mut qual_line);
341
342    ensure!(
343        !tag_line.is_empty() && tag_line.starts_with('@'),
344        "invalid FASTQ tag line in {:?}: {}",
345        source,
346        tag_line
347    );
348    ensure!(
349        !plus_line.is_empty() && plus_line.starts_with('+'),
350        "invalid FASTQ '+' separator in {:?}: {}",
351        source,
352        plus_line
353    );
354    ensure!(
355        seq_line.len() == qual_line.len(),
356        "sequence and quality lengths differ in {:?}: {} vs {}",
357        source,
358        seq_line.len(),
359        qual_line.len()
360    );
361
362    if seq_line
363        .bytes()
364        .any(|b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
365    {
366        bail!("sequence line contains whitespace in {:?}", source);
367    }
368
369    let seq: Vec<u8> = seq_line.bytes().map(canonical_base).collect();
370    let mut qual = Vec::with_capacity(qual_line.len());
371    for (idx, byte) in qual_line.bytes().enumerate() {
372        if phred_offset > 0 && byte < phred_offset {
373            bail!(
374                "quality char below PHRED offset ({}) at position {} in {:?}",
375                phred_offset,
376                idx,
377                source
378            );
379        }
380        qual.push(byte.saturating_sub(phred_offset));
381    }
382
383    Ok(Some(FastqRecord {
384        tag: tag_line,
385        seq,
386        qual,
387    }))
388}
389
390fn trim_newline(line: &mut String) {
391    while matches!(line.chars().last(), Some('\n') | Some('\r')) {
392        line.pop();
393    }
394}
395
396fn canonical_base(b: u8) -> u8 {
397    match b {
398        b'A' | b'a' => b'A',
399        b'C' | b'c' => b'C',
400        b'G' | b'g' => b'G',
401        b'T' | b't' => b'T',
402        b'N' | b'n' => b'N',
403        _ => b'N',
404    }
405}
406
407fn complement(base: u8) -> u8 {
408    match base {
409        b'A' => b'T',
410        b'T' => b'A',
411        b'C' => b'G',
412        b'G' => b'C',
413        _ => b'N',
414    }
415}
416
417fn to_lower(base: u8) -> u8 {
418    if (b'A'..=b'Z').contains(&base) {
419        base + 32
420    } else {
421        base
422    }
423}
424
425fn reverse_complement(read: &mut FastqRecord) {
426    let len = read.seq.len();
427    for i in 0..(len / 2) {
428        let j = len - 1 - i;
429        let base_i = read.seq[i];
430        let base_j = read.seq[j];
431        read.seq[i] = complement(base_j);
432        read.seq[j] = complement(base_i);
433        let qual_i = read.qual[i];
434        read.qual[i] = read.qual[j];
435        read.qual[j] = qual_i;
436    }
437    if len % 2 == 1 {
438        let mid = len / 2;
439        read.seq[mid] = complement(read.seq[mid]);
440    }
441}
442
443fn combine_pair(
444    read1: &FastqRecord,
445    read2_rev: &FastqRecord,
446    params: &CombineParams,
447) -> Option<FastqRecord> {
448    let mut best: Option<(AlignmentCandidate, CombineStatus)> =
449        evaluate_alignment(read1, read2_rev, params)
450            .map(|candidate| (candidate, CombineStatus::CombinedInnie));
451
452    if params.allow_outies {
453        if let Some(candidate) = evaluate_alignment(read2_rev, read1, params) {
454            match &best {
455                None => best = Some((candidate, CombineStatus::CombinedOutie)),
456                Some((best_cand, _)) => {
457                    if candidate.mismatch_density < best_cand.mismatch_density
458                        || (candidate.mismatch_density == best_cand.mismatch_density
459                            && candidate.qual_score < best_cand.qual_score)
460                    {
461                        best = Some((candidate, CombineStatus::CombinedOutie));
462                    }
463                }
464            }
465        }
466    }
467
468    let (candidate, status) = best?;
469    let (first, second) = match status {
470        CombineStatus::CombinedInnie => (read1, read2_rev),
471        CombineStatus::CombinedOutie => (read2_rev, read1),
472    };
473    Some(generate_combined_read(
474        first,
475        second,
476        candidate.position,
477        params,
478    ))
479}
480
481fn evaluate_alignment(
482    first: &FastqRecord,
483    second: &FastqRecord,
484    params: &CombineParams,
485) -> Option<AlignmentCandidate> {
486    if first.len() < params.min_overlap {
487        return None;
488    }
489
490    let have_n = first.seq.contains(&b'N') || second.seq.contains(&b'N');
491    let mut best_density = params.max_mismatch_density + 1.0;
492    let mut best_qual_score = 0.0f32;
493    let mut best_position: Option<usize> = None;
494
495    let start = if first.len() > second.len() {
496        first.len() - second.len()
497    } else {
498        0
499    };
500    let end = first
501        .len()
502        .saturating_sub(params.min_overlap)
503        .saturating_add(1);
504
505    for i in start..end {
506        let overlap_len = first.len().saturating_sub(i);
507        if overlap_len > second.len() {
508            continue;
509        }
510
511        let stats = compute_mismatch_stats(
512            &first.seq[i..first.len()],
513            &second.seq[..overlap_len],
514            &first.qual[i..first.len()],
515            &second.qual[..overlap_len],
516            have_n,
517        );
518
519        if stats.effective_len >= params.min_overlap {
520            let score_len = (stats.effective_len.min(params.max_overlap)).max(1) as f32;
521            let mismatch_density = stats.mismatches as f32 / score_len;
522            let qual_score = stats.mismatch_qual_total as f32 / score_len;
523
524            if mismatch_density <= best_density
525                && (mismatch_density < best_density || qual_score < best_qual_score)
526            {
527                best_density = mismatch_density;
528                best_qual_score = qual_score;
529                best_position = Some(i);
530            }
531        }
532    }
533
534    let position = best_position?;
535    if best_density > params.max_mismatch_density {
536        return None;
537    }
538
539    Some(AlignmentCandidate {
540        position,
541        mismatch_density: best_density,
542        qual_score: best_qual_score,
543    })
544}
545
546fn compute_mismatch_stats(
547    seq1: &[u8],
548    seq2: &[u8],
549    qual1: &[u8],
550    qual2: &[u8],
551    have_n: bool,
552) -> MismatchStats {
553    let mut effective_len = seq1.len();
554    let mut mismatches: u32 = 0;
555    let mut mismatch_qual_total: u32 = 0;
556
557    if have_n {
558        for i in 0..seq1.len() {
559            if seq1[i] == b'N' || seq2[i] == b'N' {
560                effective_len -= 1;
561            } else if seq1[i] != seq2[i] {
562                mismatches += 1;
563                mismatch_qual_total += qual1[i].min(qual2[i]) as u32;
564            }
565        }
566    } else {
567        for i in 0..seq1.len() {
568            if seq1[i] != seq2[i] {
569                mismatches += 1;
570                mismatch_qual_total += qual1[i].min(qual2[i]) as u32;
571            }
572        }
573    }
574
575    MismatchStats {
576        effective_len,
577        mismatches,
578        mismatch_qual_total,
579    }
580}
581
582fn generate_combined_read(
583    read1: &FastqRecord,
584    read2: &FastqRecord,
585    overlap_begin: usize,
586    params: &CombineParams,
587) -> FastqRecord {
588    let overlap_len = read1.len() - overlap_begin;
589    let remaining_len = read2.len().saturating_sub(overlap_len);
590    let combined_len = read1.len() + remaining_len;
591
592    let mut seq = Vec::with_capacity(combined_len);
593    let mut qual = Vec::with_capacity(combined_len);
594
595    for idx in 0..overlap_begin {
596        let base = read1.seq[idx];
597        let base = if params.lowercase_overhang {
598            to_lower(base)
599        } else {
600            base
601        };
602        seq.push(base);
603        qual.push(read1.qual[idx]);
604    }
605
606    for offset in 0..overlap_len {
607        let base1 = read1.seq[overlap_begin + offset];
608        let base2 = read2.seq[offset];
609        let q1 = read1.qual[overlap_begin + offset];
610        let q2 = read2.qual[offset];
611
612        if base1 == base2 {
613            seq.push(base1);
614            qual.push(q1.max(q2));
615        } else {
616            let q = if params.cap_mismatch_quals {
617                q1.min(q2).min(2)
618            } else {
619                q1.abs_diff(q2).max(2)
620            };
621            let base = if q1 > q2 {
622                base1
623            } else if q2 > q1 {
624                base2
625            } else if base2 == b'N' {
626                base1
627            } else {
628                base2
629            };
630            seq.push(base);
631            qual.push(q);
632        }
633    }
634
635    for idx in overlap_len..read2.len() {
636        let base = read2.seq[idx];
637        let base = if params.lowercase_overhang {
638            to_lower(base)
639        } else {
640            base
641        };
642        seq.push(base);
643        qual.push(read2.qual[idx]);
644    }
645
646    FastqRecord {
647        tag: String::new(),
648        seq,
649        qual,
650    }
651}
652
653fn combined_tag(read1: &FastqRecord) -> String {
654    let tag = &read1.tag;
655    if let Some(pos) = tag.rfind('/') {
656        if pos + 2 < tag.len() && tag.as_bytes()[pos + 2] == b'#' {
657            format!("{}{}", &tag[..pos], &tag[pos + 2..])
658        } else {
659            tag[..pos].to_string()
660        }
661    } else {
662        tag.clone()
663    }
664}
665
666fn write_fastq<W: Write>(writer: &mut W, read: &FastqRecord, phred_offset: u8) -> Result<()> {
667    writer
668        .write_all(read.tag.as_bytes())
669        .context("failed to write FASTQ tag")?;
670    writer.write_all(b"\n").context("failed to write newline")?;
671    writer
672        .write_all(&read.seq)
673        .context("failed to write FASTQ sequence")?;
674    writer
675        .write_all(b"\n+\n")
676        .context("failed to write FASTQ separator")?;
677
678    let mut qual_buf = Vec::with_capacity(read.qual.len());
679    for &q in &read.qual {
680        qual_buf.push(q + phred_offset);
681    }
682    writer
683        .write_all(&qual_buf)
684        .context("failed to write FASTQ quality")?;
685    writer.write_all(b"\n").context("failed to write newline")?;
686    Ok(())
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692
693    #[test]
694    fn trim_newline_removes_crlf() {
695        let mut s = String::from("TAG\r\n");
696        trim_newline(&mut s);
697        assert_eq!(s, "TAG");
698    }
699
700    #[test]
701    fn canonical_base_normalises_cases_and_unknowns() {
702        assert_eq!(canonical_base(b'a'), b'A');
703        assert_eq!(canonical_base(b'T'), b'T');
704        assert_eq!(canonical_base(b'!'), b'N');
705    }
706}