Skip to main content

rsomics_bam_consensus/
lib.rs

1//! FASTA consensus from a coordinate-sorted BAM — simple-mode port of
2//! `samtools consensus -m simple`.
3//!
4//! Algorithm: `bam_consensus.c` `calculate_consensus_simple` + `basic_fasta`.
5//! Per reference position the engine accumulates a weighted score for each of
6//! A, C, G, T, and * (deletion); the weight per read is `qual` (or 1 when
7//! `use_qual = false`) times a compatibility factor from the seqi2A/C/G/T
8//! tables.  The highest-scoring allele is the call; if the second-highest
9//! scores ≥ `het_fract × score1` and `ambig` is on, the two are OR-ed into an
10//! IUPAC code.  If total depth < `min_depth` or the call's fraction of total
11//! score < `call_fract`, the call is N (or * for a gap).
12//!
13//! Reference source: `samtools/bam_consensus.c` (MIT), tag 1.23.1.
14//!
15//! Pileup engine: custom lightweight walker over coordinate-sorted BAM records,
16//! modelled on `samtools/bam_plbuf.c` / `consensus_pileup.c`.  Avoids the
17//! per-column `Vec` allocations and HashMap overlap tracking of the generic
18//! `rsomics-pileup` crate, which adds enough overhead to lose the perf gate
19//! on a 150x WGS fixture.  Single-threaded decode + CIGAR walk; all I/O is
20//! on the calling thread (readers are passed in from the caller).
21
22use std::io::Write;
23use std::num::NonZero;
24use std::path::Path;
25
26use rsomics_bamio::raw::RawRecord;
27use rsomics_common::{Result, RsomicsError};
28use serde::Serialize;
29
30// seqi2{A,C,G,T} lookup tables: index is 4-bit seq_nt16 (A=1 C=2 G=4 T=8).
31// Pure bases carry weight 8; 2-base ambiguity codes 4; 3-base 2; N 1.
32// Source: bam_consensus.c static arrays, lines ~1908–1911 in 1.23.1.
33//                            * A  C  M  G  R  S  V  T  W  Y  H  K  D  B  N
34const SEQI2A: [u8; 16] = [0, 8, 0, 4, 0, 4, 0, 2, 0, 4, 0, 2, 0, 2, 0, 1];
35const SEQI2C: [u8; 16] = [0, 0, 8, 4, 0, 0, 4, 2, 0, 0, 4, 2, 0, 0, 2, 1];
36const SEQI2G: [u8; 16] = [0, 0, 0, 0, 8, 4, 4, 1, 0, 0, 0, 0, 4, 2, 2, 1];
37const SEQI2T: [u8; 16] = [0, 0, 0, 0, 0, 0, 0, 0, 8, 4, 4, 2, 8, 2, 2, 1];
38
39/// Score/depth slots: 0=A 1=C 2=G 3=T 4=* (gap).
40const N_BASES: usize = 5;
41
42// SAM FLAG bits (SAMv1 §1.4).
43const FLAG_UNMAP: u16 = 0x4;
44const FLAG_SECONDARY: u16 = 0x100;
45const FLAG_QCFAIL: u16 = 0x200;
46const FLAG_DUP: u16 = 0x400;
47
48// CIGAR op codes (BAM packed encoding, low nibble).
49const CIGAR_MATCH: u8 = 0;
50const CIGAR_INS: u8 = 1;
51const CIGAR_DEL: u8 = 2;
52const CIGAR_REF_SKIP: u8 = 3;
53const CIGAR_SOFT_CLIP: u8 = 4;
54const CIGAR_EQUAL: u8 = 7;
55const CIGAR_DIFF: u8 = 8;
56
57/// Default exclude-flags matching samtools consensus (UNMAP|SECONDARY|QCFAIL|DUP).
58pub const DEFAULT_EXCL_FLAGS: u16 = FLAG_UNMAP | FLAG_SECONDARY | FLAG_QCFAIL | FLAG_DUP;
59
60#[derive(Debug, Clone)]
61pub struct ConsensusOpts {
62    /// Use base qualities as weights (`-q`/`--use-qual`). Default off.
63    pub use_qual: bool,
64    /// Minimum base quality to count a base (`--min-BQ`). Default 0.
65    pub min_qual: u8,
66    /// Minimum depth to emit a non-N call (`-d`/`--min-depth`). Default 1.
67    pub min_depth: u32,
68    /// Minimum fraction of total score that the called allele(s) must reach
69    /// (`-c`/`--call-fract`). Default 0.75.
70    pub call_fract: f64,
71    /// Minimum ratio of second-best to best score to call a het
72    /// (`-H`/`--het-fract`). Default 0.5.
73    pub het_fract: f64,
74    /// Enable IUPAC ambiguity codes (`-A`/`--ambig`). Default off.
75    pub ambig: bool,
76    /// Emit deletion (`*`) bases (`--show-del yes`). Default false.
77    pub show_del: bool,
78    /// FASTA/Q line length (`-l`/`--line-len`). Default 70.
79    pub line_len: usize,
80    /// Reads with any of these FLAG bits are excluded. Default `DEFAULT_EXCL_FLAGS`.
81    pub excl_flags: u16,
82    /// Only include reads with any of these FLAG bits set (0 = no filter).
83    pub incl_flags: u16,
84    /// Minimum mapping quality (`--min-MQ`). Default 0.
85    pub min_mapq: u8,
86}
87
88impl Default for ConsensusOpts {
89    fn default() -> Self {
90        Self {
91            use_qual: false,
92            min_qual: 0,
93            min_depth: 1,
94            call_fract: 0.75,
95            het_fract: 0.5,
96            ambig: false,
97            show_del: false,
98            line_len: 70,
99            excl_flags: DEFAULT_EXCL_FLAGS,
100            incl_flags: 0,
101            min_mapq: 0,
102        }
103    }
104}
105
106#[derive(Debug, Default, Clone, Serialize)]
107pub struct ConsensusStats {
108    pub sequences: u64,
109    pub positions: u64,
110}
111
112/// `bam_consensus.c::calculate_consensus_simple` — returns ASCII base call and
113/// quality (0–100 as a percentage of total score).
114///
115/// `reads`: `(base4, qual)` pairs, one per non-refskip read at this column.
116/// `base4` is the 4-bit seq_nt16 value; gap/del is encoded as 16.
117pub fn simple_call(reads: &[(u8, u8)], opts: &ConsensusOpts) -> (u8, u8) {
118    // IUPAC het table: maps seqi-bit OR-value to ASCII.
119    // Replicates `const char *het = "NACMGRSVTWYHKDBN*ac?g???t???????";` from
120    // bam_consensus.c `calculate_consensus_simple`.
121    const HET: &[u8; 32] = b"NACMGRSVTWYHKDBN*ac?g???t???????";
122
123    let mut score = [0u64; N_BASES]; // A C G T *
124    let mut tot_depth: u32 = 0;
125
126    for &(b4, q) in reads {
127        if q < opts.min_qual {
128            continue;
129        }
130        let w: u64 = if opts.use_qual { q as u64 } else { 1 };
131        if b4 < 16 {
132            let b = b4 as usize;
133            let qa = SEQI2A[b] as u64 * w;
134            let qc = SEQI2C[b] as u64 * w;
135            let qg = SEQI2G[b] as u64 * w;
136            let qt = SEQI2T[b] as u64 * w;
137            if qa > 0 {
138                score[0] += qa;
139            }
140            if qc > 0 {
141                score[1] += qc;
142            }
143            if qg > 0 {
144                score[2] += qg;
145            }
146            if qt > 0 {
147                score[3] += qt;
148            }
149        } else {
150            score[4] += 8 * w;
151        }
152        tot_depth += 1;
153    }
154
155    let tscore: u64 = score.iter().sum();
156
157    // Find best (call1) and second-best (call2).
158    // Slot i → seqi bit = 1 << i  (0→A=1, 1→C=2, 2→G=4, 3→T=8, 4→*=16).
159    let mut call1: usize = 15; // seqi 15 = N
160    let mut call2: usize = 15;
161    let mut score1: u64 = 0;
162    let mut score2: u64 = 0;
163
164    for (i, &s) in score.iter().enumerate() {
165        if s > score1 {
166            score2 = score1;
167            call2 = call1;
168            score1 = s;
169            call1 = 1 << i;
170        } else if s > score2 {
171            score2 = s;
172            call2 = 1 << i;
173        }
174    }
175
176    let mut used_base = call1;
177    let mut used_score = score1;
178
179    if opts.ambig && score1 > 0 && (score2 as f64) >= opts.het_fract * score1 as f64 {
180        used_base |= call2;
181        used_score += score2;
182    }
183
184    // Depth / fraction gate.
185    if tot_depth < opts.min_depth
186        || (tscore > 0 && (used_score as f64) < opts.call_fract * tscore as f64)
187    {
188        used_base = if call1 == 16 { 16 } else { 0 };
189    }
190
191    let cb = HET[used_base.min(31)];
192    let cq: u8 = if used_base != 0 && tscore > 0 {
193        (100 * used_score / tscore).min(100) as u8
194    } else {
195        0
196    };
197
198    (cb, cq)
199}
200
201// ---------------------------------------------------------------------------
202// Lightweight pileup walker — consensus-specific, no HashMap overhead.
203// Mirrors samtools consensus_pileup.c: a sorted buffer of active reads,
204// a reference cursor, and a CIGAR walk per read.
205// ---------------------------------------------------------------------------
206
207/// One active read in the pileup buffer.
208///
209/// At construction we flatten the CIGAR into a flat `slots` array indexed by
210/// reference offset from `beg`.  Each slot encodes `(base4, qual, flags)` in
211/// three bytes, making the per-column `at()` an O(1) array lookup with no
212/// pointer-chasing into the raw record bytes.
213///
214/// `base4` values: 0–15 = seq_nt16 base; 16 = deletion; 17 = ref-skip (intron).
215struct ActiveRead {
216    /// Reference start (inclusive, 0-based) of this read.
217    beg: i64,
218    /// Reference end (exclusive, 0-based).
219    end: i64,
220    /// tid of the read.
221    tid: i32,
222    /// Flattened per-reference-position data: (base4, qual).
223    /// Indexed by (ref_pos - beg) — length == (end - beg).
224    slots: Vec<(u8, u8)>,
225}
226
227/// Sentinel slot values for deletions and ref-skips.
228const SLOT_DEL: (u8, u8) = (16, 0);
229const SLOT_SKIP: (u8, u8) = (17, 0);
230
231impl ActiveRead {
232    /// Construct from a raw record, optionally reusing a recycled slot Vec from
233    /// the pool to avoid a malloc.
234    fn new(rec: RawRecord, mut slots: Vec<(u8, u8)>) -> Self {
235        let beg = i64::from(rec.alignment_start());
236        let tid = rec.reference_sequence_id();
237        let qscores: &[u8] = rec.quality_scores();
238
239        // Compute reference span for pre-allocation hint.
240        let span = {
241            let mut s: i64 = 0;
242            for (op, l) in rec.cigar_ops() {
243                match op {
244                    CIGAR_MATCH | CIGAR_DEL | CIGAR_REF_SKIP | CIGAR_EQUAL | CIGAR_DIFF => {
245                        s += i64::from(l);
246                    }
247                    _ => {}
248                }
249            }
250            s as usize
251        };
252        slots.clear();
253        slots.reserve(span.saturating_sub(slots.capacity()));
254
255        // Walk CIGAR once and fill slots.
256        let mut qpos: usize = 0;
257        for (op, l) in rec.cigar_ops() {
258            let l = l as usize;
259            match op {
260                CIGAR_MATCH | CIGAR_EQUAL | CIGAR_DIFF => {
261                    for i in 0..l {
262                        let qi = qpos + i;
263                        let b4 = rec.seq_nibble(qi);
264                        let q = if qi < qscores.len() { qscores[qi] } else { 0 };
265                        slots.push((b4, q));
266                    }
267                    qpos += l;
268                }
269                CIGAR_DEL => {
270                    slots.extend(std::iter::repeat_n(SLOT_DEL, l));
271                }
272                CIGAR_REF_SKIP => {
273                    slots.extend(std::iter::repeat_n(SLOT_SKIP, l));
274                }
275                CIGAR_INS | CIGAR_SOFT_CLIP => {
276                    qpos += l;
277                }
278                _ => {}
279            }
280        }
281
282        let end = beg + slots.len() as i64;
283        ActiveRead {
284            beg,
285            end,
286            tid,
287            slots,
288        }
289    }
290
291    /// Return the slot Vec to the pool for reuse.
292    fn retire(self, pool: &mut Vec<Vec<(u8, u8)>>) {
293        if pool.len() < 128 {
294            pool.push(self.slots);
295        }
296    }
297
298    /// O(1) lookup: returns `(base4, qual, is_del, is_refskip)` at reference `pos`.
299    #[inline]
300    fn at(&self, pos: i64) -> (u8, u8, bool, bool) {
301        let off = (pos - self.beg) as usize;
302        if off >= self.slots.len() {
303            return (0, 0, true, false);
304        }
305        let (b4, q) = self.slots[off];
306        match b4 {
307            16 => (16, 0, true, false), // deletion
308            17 => (16, 0, true, true),  // ref-skip
309            b => (b, q, false, false),
310        }
311    }
312}
313
314/// Write a FASTA record to `out`, wrapping at `line_len`.
315fn write_fasta(
316    out: &mut dyn Write,
317    name: &str,
318    seq: &[u8],
319    line_len: usize,
320) -> std::io::Result<()> {
321    if seq.is_empty() {
322        return Ok(());
323    }
324    writeln!(out, ">{name}")?;
325    for chunk in seq.chunks(line_len) {
326        out.write_all(chunk)?;
327        out.write_all(b"\n")?;
328    }
329    Ok(())
330}
331
332/// Compute simple-mode FASTA consensus for every covered contig.
333pub fn consensus(
334    bam_path: &Path,
335    out: &mut dyn Write,
336    opts: &ConsensusOpts,
337    workers: NonZero<usize>,
338) -> Result<ConsensusStats> {
339    let mut stats = ConsensusStats::default();
340
341    // raw::read_record bypasses the noodles record layer and reads directly from
342    // the inner BufRead via reader.get_mut(). bgzf::io::MultithreadedReader's
343    // BufRead impl interacts poorly with header-then-raw-read sequences across
344    // platforms, so we always use a single-threaded reader here.
345    let _ = workers;
346    let st = NonZero::<usize>::new(1).unwrap();
347    let mut reader = rsomics_bamio::open_with_workers(bam_path, st)?;
348    let header = reader.read_header().map_err(RsomicsError::Io)?;
349
350    let ref_names: Vec<String> = header
351        .reference_sequences()
352        .keys()
353        .map(ToString::to_string)
354        .collect();
355
356    let reader_mut = reader.get_mut();
357    // Two pre-allocated record buffers; we swap between them to avoid per-record
358    // heap allocation. `read_buf` is passed to read_record; `next_rec` holds the
359    // lookahead. When we consume next_rec into the active set, we std::mem::swap
360    // the two so read_buf reuses next_rec's allocation for the subsequent read.
361    let mut read_buf = RawRecord::default();
362    let mut next_rec: Option<RawRecord> = None;
363
364    // Active read buffer: reads whose alignment span covers the current cursor.
365    let mut active: Vec<ActiveRead> = Vec::with_capacity(512);
366    // Minimum end position among all active reads — used to skip retain() when
367    // cursor hasn't yet reached any read's end.  Recomputed after retain().
368    let mut active_min_end: i64 = i64::MAX;
369    // Per-column base buffer: (base4, qual) pairs for simple_call.
370    let mut col_buf: Vec<(u8, u8)> = Vec::with_capacity(512);
371    // Free list of recycled slot Vecs — avoids per-read malloc for the slot array.
372    // Pre-filled with 64 Vecs (capacity 300 = typical 150bp read × 2 for indels)
373    // so the initial burst of reads doesn't hit the global allocator.
374    let mut slot_pool: Vec<Vec<(u8, u8)>> = (0..64).map(|_| Vec::with_capacity(300)).collect();
375
376    // Current contig state.
377    let mut cur_tid: i32 = -1;
378    let mut cur_seq: Vec<u8> = Vec::new();
379    let mut last_pos: i64 = -1;
380
381    // Pileup cursor: (tid, pos) of next column to emit.
382    let mut cursor_tid: i32 = -1;
383    let mut cursor_pos: i64 = 0;
384
385    // Advance past filtered records: read into read_buf, swap into next_rec slot.
386    // Returns true if a record was successfully placed in next_rec.
387    macro_rules! read_next {
388        () => {{
389            let n = rsomics_bamio::raw::read_record(reader_mut, &mut read_buf)?;
390            if n > 0 {
391                // Swap: read_buf gets the old next_rec's Vec allocation (reuse),
392                // next_rec gets the freshly populated read_buf contents.
393                let mut tmp = next_rec.take().unwrap_or_default();
394                std::mem::swap(&mut read_buf, &mut tmp);
395                next_rec = Some(tmp);
396                true
397            } else {
398                next_rec = None;
399                false
400            }
401        }};
402    }
403
404    // Read the first record.
405    read_next!();
406
407    loop {
408        // Feed records that start at or before cursor_pos (or prime the cursor).
409        #[allow(clippy::while_let_loop)]
410        loop {
411            let Some(ref nr) = next_rec else { break };
412            let flag = nr.flags();
413            if nr.reference_sequence_id() < 0 || flag & FLAG_UNMAP != 0 {
414                read_next!();
415                continue;
416            }
417            if opts.excl_flags != 0 && flag & opts.excl_flags != 0 {
418                read_next!();
419                continue;
420            }
421            if opts.incl_flags != 0 && flag & opts.incl_flags == 0 {
422                read_next!();
423                continue;
424            }
425            if nr.mapping_quality() < opts.min_mapq {
426                read_next!();
427                continue;
428            }
429
430            let rtid = nr.reference_sequence_id();
431            let rbeg = i64::from(nr.alignment_start());
432
433            // If cursor not yet set, initialise to this read's start.
434            if cursor_tid < 0 {
435                cursor_tid = rtid;
436                cursor_pos = rbeg;
437            }
438
439            // Read is ahead of cursor — stop feeding.
440            if rtid > cursor_tid || (rtid == cursor_tid && rbeg > cursor_pos) {
441                break;
442            }
443
444            // Consume the lookahead record into the active set, then read next.
445            let cur_nr = next_rec.take().unwrap();
446            let recycled = slot_pool.pop().unwrap_or_default();
447            let ar = ActiveRead::new(cur_nr, recycled);
448            if ar.end < active_min_end {
449                active_min_end = ar.end;
450            }
451            active.push(ar);
452
453            read_next!();
454        }
455
456        // Prune reads that ended before cursor_pos.  Skip the O(n) retain when
457        // no read has ended yet (cursor hasn't reached the nearest end).
458        if cursor_pos >= active_min_end {
459            // Drain expired reads and return their slot Vecs to the pool.
460            let mut i = 0;
461            while i < active.len() {
462                if active[i].tid != cursor_tid || active[i].end <= cursor_pos {
463                    let retired = active.swap_remove(i);
464                    retired.retire(&mut slot_pool);
465                } else {
466                    i += 1;
467                }
468            }
469            active_min_end = active.iter().map(|r| r.end).fold(i64::MAX, i64::min);
470        }
471
472        if active.is_empty() {
473            active_min_end = i64::MAX;
474            // No active reads: either done or jump to next read's position.
475            let Some(ref nr) = next_rec else {
476                // All input consumed — emit final contig.
477                break;
478            };
479            // Jump cursor to next read.
480            let rtid = nr.reference_sequence_id();
481            let rbeg = i64::from(nr.alignment_start());
482            if rtid != cursor_tid {
483                // Contig switch with no active reads — flush current contig.
484                if cur_tid >= 0 && !cur_seq.is_empty() {
485                    let name = ref_names.get(cur_tid as usize).ok_or_else(|| {
486                        RsomicsError::InvalidInput(format!("tid {cur_tid} not in header"))
487                    })?;
488                    write_fasta(out, name, &cur_seq, opts.line_len).map_err(RsomicsError::Io)?;
489                    stats.sequences += 1;
490                    cur_seq.clear();
491                    cur_tid = -1;
492                    last_pos = -1;
493                }
494            }
495            cursor_tid = rtid;
496            cursor_pos = rbeg;
497            continue;
498        }
499
500        // Emit column at (cursor_tid, cursor_pos).
501        let emit_tid = cursor_tid;
502        let emit_pos = cursor_pos;
503
504        // Handle contig switch.
505        if emit_tid != cur_tid {
506            if cur_tid >= 0 && !cur_seq.is_empty() {
507                let name = ref_names.get(cur_tid as usize).ok_or_else(|| {
508                    RsomicsError::InvalidInput(format!("tid {cur_tid} not in header"))
509                })?;
510                write_fasta(out, name, &cur_seq, opts.line_len).map_err(RsomicsError::Io)?;
511                stats.sequences += 1;
512                cur_seq.clear();
513            }
514            cur_tid = emit_tid;
515            last_pos = -1;
516        }
517
518        // Collect bases from active reads.
519        col_buf.clear();
520        for ar in &mut active {
521            let (b4, q, is_del, is_refskip) = ar.at(emit_pos);
522            if is_refskip {
523                continue;
524            }
525            col_buf.push((if is_del { 16 } else { b4 }, q));
526        }
527
528        let (cb, _cq) = simple_call(&col_buf, opts);
529
530        if cb == b'*' && !opts.show_del {
531            last_pos = emit_pos;
532            cursor_pos += 1;
533            continue;
534        }
535
536        // Fill gap between last covered position and this one with N.
537        if !cur_seq.is_empty() && emit_pos > last_pos + 1 {
538            let gap = (emit_pos - last_pos - 1) as usize;
539            cur_seq.extend(std::iter::repeat_n(b'N', gap));
540            stats.positions += gap as u64;
541        }
542
543        cur_seq.push(cb);
544        stats.positions += 1;
545        last_pos = emit_pos;
546        cursor_pos += 1;
547    }
548
549    // Emit final contig.
550    if cur_tid >= 0 && !cur_seq.is_empty() {
551        let name = ref_names
552            .get(cur_tid as usize)
553            .ok_or_else(|| RsomicsError::InvalidInput(format!("tid {cur_tid} not in header")))?;
554        write_fasta(out, name, &cur_seq, opts.line_len).map_err(RsomicsError::Io)?;
555        stats.sequences += 1;
556    }
557
558    Ok(stats)
559}