rsomics_bam_consensus/lib.rs
1//! FASTA consensus from a coordinate-sorted BAM — simple-mode port of
2//! `samtools consensus -m simple`.
3//!
4//! Algorithm: `bam_consensus.c` `calculate_consensus_simple` + `basic_fasta`.
5//! Per reference position the engine accumulates a weighted score for each of
6//! A, C, G, T, and * (deletion); the weight per read is `qual` (or 1 when
7//! `use_qual = false`) times a compatibility factor from the seqi2A/C/G/T
8//! tables. The highest-scoring allele is the call; if the second-highest
9//! scores ≥ `het_fract × score1` and `ambig` is on, the two are OR-ed into an
10//! IUPAC code. If total depth < `min_depth` or the call's fraction of total
11//! score < `call_fract`, the call is N (or * for a gap).
12//!
13//! Reference source: `samtools/bam_consensus.c` (MIT), tag 1.23.1.
14//!
15//! Pileup engine: custom lightweight walker over coordinate-sorted BAM records,
16//! modelled on `samtools/bam_plbuf.c` / `consensus_pileup.c`. Avoids the
17//! per-column `Vec` allocations and HashMap overlap tracking of the generic
18//! `rsomics-pileup` crate, which adds enough overhead to lose the perf gate
19//! on a 150x WGS fixture. Single-threaded decode + CIGAR walk; all I/O is
20//! on the calling thread (readers are passed in from the caller).
21
22use std::io::Write;
23use std::num::NonZero;
24use std::path::Path;
25
26use rsomics_bamio::raw::RawRecord;
27use rsomics_common::{Result, RsomicsError};
28use serde::Serialize;
29
30// seqi2{A,C,G,T} lookup tables: index is 4-bit seq_nt16 (A=1 C=2 G=4 T=8).
31// Pure bases carry weight 8; 2-base ambiguity codes 4; 3-base 2; N 1.
32// Source: bam_consensus.c static arrays, lines ~1908–1911 in 1.23.1.
33// * A C M G R S V T W Y H K D B N
34const SEQI2A: [u8; 16] = [0, 8, 0, 4, 0, 4, 0, 2, 0, 4, 0, 2, 0, 2, 0, 1];
35const SEQI2C: [u8; 16] = [0, 0, 8, 4, 0, 0, 4, 2, 0, 0, 4, 2, 0, 0, 2, 1];
36const SEQI2G: [u8; 16] = [0, 0, 0, 0, 8, 4, 4, 1, 0, 0, 0, 0, 4, 2, 2, 1];
37const SEQI2T: [u8; 16] = [0, 0, 0, 0, 0, 0, 0, 0, 8, 4, 4, 2, 8, 2, 2, 1];
38
39/// Score/depth slots: 0=A 1=C 2=G 3=T 4=* (gap).
40const N_BASES: usize = 5;
41
42// SAM FLAG bits (SAMv1 §1.4).
43const FLAG_UNMAP: u16 = 0x4;
44const FLAG_SECONDARY: u16 = 0x100;
45const FLAG_QCFAIL: u16 = 0x200;
46const FLAG_DUP: u16 = 0x400;
47
48// CIGAR op codes (BAM packed encoding, low nibble).
49const CIGAR_MATCH: u8 = 0;
50const CIGAR_INS: u8 = 1;
51const CIGAR_DEL: u8 = 2;
52const CIGAR_REF_SKIP: u8 = 3;
53const CIGAR_SOFT_CLIP: u8 = 4;
54const CIGAR_EQUAL: u8 = 7;
55const CIGAR_DIFF: u8 = 8;
56
57/// Default exclude-flags matching samtools consensus (UNMAP|SECONDARY|QCFAIL|DUP).
58pub const DEFAULT_EXCL_FLAGS: u16 = FLAG_UNMAP | FLAG_SECONDARY | FLAG_QCFAIL | FLAG_DUP;
59
60#[derive(Debug, Clone)]
61pub struct ConsensusOpts {
62 /// Use base qualities as weights (`-q`/`--use-qual`). Default off.
63 pub use_qual: bool,
64 /// Minimum base quality to count a base (`--min-BQ`). Default 0.
65 pub min_qual: u8,
66 /// Minimum depth to emit a non-N call (`-d`/`--min-depth`). Default 1.
67 pub min_depth: u32,
68 /// Minimum fraction of total score that the called allele(s) must reach
69 /// (`-c`/`--call-fract`). Default 0.75.
70 pub call_fract: f64,
71 /// Minimum ratio of second-best to best score to call a het
72 /// (`-H`/`--het-fract`). Default 0.5.
73 pub het_fract: f64,
74 /// Enable IUPAC ambiguity codes (`-A`/`--ambig`). Default off.
75 pub ambig: bool,
76 /// Emit deletion (`*`) bases (`--show-del yes`). Default false.
77 pub show_del: bool,
78 /// FASTA/Q line length (`-l`/`--line-len`). Default 70.
79 pub line_len: usize,
80 /// Reads with any of these FLAG bits are excluded. Default `DEFAULT_EXCL_FLAGS`.
81 pub excl_flags: u16,
82 /// Only include reads with any of these FLAG bits set (0 = no filter).
83 pub incl_flags: u16,
84 /// Minimum mapping quality (`--min-MQ`). Default 0.
85 pub min_mapq: u8,
86}
87
88impl Default for ConsensusOpts {
89 fn default() -> Self {
90 Self {
91 use_qual: false,
92 min_qual: 0,
93 min_depth: 1,
94 call_fract: 0.75,
95 het_fract: 0.5,
96 ambig: false,
97 show_del: false,
98 line_len: 70,
99 excl_flags: DEFAULT_EXCL_FLAGS,
100 incl_flags: 0,
101 min_mapq: 0,
102 }
103 }
104}
105
106#[derive(Debug, Default, Clone, Serialize)]
107pub struct ConsensusStats {
108 pub sequences: u64,
109 pub positions: u64,
110}
111
112/// `bam_consensus.c::calculate_consensus_simple` — returns ASCII base call and
113/// quality (0–100 as a percentage of total score).
114///
115/// `reads`: `(base4, qual)` pairs, one per non-refskip read at this column.
116/// `base4` is the 4-bit seq_nt16 value; gap/del is encoded as 16.
117pub fn simple_call(reads: &[(u8, u8)], opts: &ConsensusOpts) -> (u8, u8) {
118 // IUPAC het table: maps seqi-bit OR-value to ASCII.
119 // Replicates `const char *het = "NACMGRSVTWYHKDBN*ac?g???t???????";` from
120 // bam_consensus.c `calculate_consensus_simple`.
121 const HET: &[u8; 32] = b"NACMGRSVTWYHKDBN*ac?g???t???????";
122
123 let mut score = [0u64; N_BASES]; // A C G T *
124 let mut tot_depth: u32 = 0;
125
126 for &(b4, q) in reads {
127 if q < opts.min_qual {
128 continue;
129 }
130 let w: u64 = if opts.use_qual { q as u64 } else { 1 };
131 if b4 < 16 {
132 let b = b4 as usize;
133 let qa = SEQI2A[b] as u64 * w;
134 let qc = SEQI2C[b] as u64 * w;
135 let qg = SEQI2G[b] as u64 * w;
136 let qt = SEQI2T[b] as u64 * w;
137 if qa > 0 {
138 score[0] += qa;
139 }
140 if qc > 0 {
141 score[1] += qc;
142 }
143 if qg > 0 {
144 score[2] += qg;
145 }
146 if qt > 0 {
147 score[3] += qt;
148 }
149 } else {
150 score[4] += 8 * w;
151 }
152 tot_depth += 1;
153 }
154
155 let tscore: u64 = score.iter().sum();
156
157 // Find best (call1) and second-best (call2).
158 // Slot i → seqi bit = 1 << i (0→A=1, 1→C=2, 2→G=4, 3→T=8, 4→*=16).
159 let mut call1: usize = 15; // seqi 15 = N
160 let mut call2: usize = 15;
161 let mut score1: u64 = 0;
162 let mut score2: u64 = 0;
163
164 for (i, &s) in score.iter().enumerate() {
165 if s > score1 {
166 score2 = score1;
167 call2 = call1;
168 score1 = s;
169 call1 = 1 << i;
170 } else if s > score2 {
171 score2 = s;
172 call2 = 1 << i;
173 }
174 }
175
176 let mut used_base = call1;
177 let mut used_score = score1;
178
179 if opts.ambig && score1 > 0 && (score2 as f64) >= opts.het_fract * score1 as f64 {
180 used_base |= call2;
181 used_score += score2;
182 }
183
184 // Depth / fraction gate.
185 if tot_depth < opts.min_depth
186 || (tscore > 0 && (used_score as f64) < opts.call_fract * tscore as f64)
187 {
188 used_base = if call1 == 16 { 16 } else { 0 };
189 }
190
191 let cb = HET[used_base.min(31)];
192 let cq: u8 = if used_base != 0 && tscore > 0 {
193 (100 * used_score / tscore).min(100) as u8
194 } else {
195 0
196 };
197
198 (cb, cq)
199}
200
201// ---------------------------------------------------------------------------
202// Lightweight pileup walker — consensus-specific, no HashMap overhead.
203// Mirrors samtools consensus_pileup.c: a sorted buffer of active reads,
204// a reference cursor, and a CIGAR walk per read.
205// ---------------------------------------------------------------------------
206
207/// One active read in the pileup buffer.
208///
209/// At construction we flatten the CIGAR into a flat `slots` array indexed by
210/// reference offset from `beg`. Each slot encodes `(base4, qual, flags)` in
211/// three bytes, making the per-column `at()` an O(1) array lookup with no
212/// pointer-chasing into the raw record bytes.
213///
214/// `base4` values: 0–15 = seq_nt16 base; 16 = deletion; 17 = ref-skip (intron).
215struct ActiveRead {
216 /// Reference start (inclusive, 0-based) of this read.
217 beg: i64,
218 /// Reference end (exclusive, 0-based).
219 end: i64,
220 /// tid of the read.
221 tid: i32,
222 /// Flattened per-reference-position data: (base4, qual).
223 /// Indexed by (ref_pos - beg) — length == (end - beg).
224 slots: Vec<(u8, u8)>,
225}
226
227/// Sentinel slot values for deletions and ref-skips.
228const SLOT_DEL: (u8, u8) = (16, 0);
229const SLOT_SKIP: (u8, u8) = (17, 0);
230
231impl ActiveRead {
232 /// Construct from a raw record, optionally reusing a recycled slot Vec from
233 /// the pool to avoid a malloc.
234 fn new(rec: RawRecord, mut slots: Vec<(u8, u8)>) -> Self {
235 let beg = i64::from(rec.alignment_start());
236 let tid = rec.reference_sequence_id();
237 let qscores: &[u8] = rec.quality_scores();
238
239 // Compute reference span for pre-allocation hint.
240 let span = {
241 let mut s: i64 = 0;
242 for (op, l) in rec.cigar_ops() {
243 match op {
244 CIGAR_MATCH | CIGAR_DEL | CIGAR_REF_SKIP | CIGAR_EQUAL | CIGAR_DIFF => {
245 s += i64::from(l);
246 }
247 _ => {}
248 }
249 }
250 s as usize
251 };
252 slots.clear();
253 slots.reserve(span.saturating_sub(slots.capacity()));
254
255 // Walk CIGAR once and fill slots.
256 let mut qpos: usize = 0;
257 for (op, l) in rec.cigar_ops() {
258 let l = l as usize;
259 match op {
260 CIGAR_MATCH | CIGAR_EQUAL | CIGAR_DIFF => {
261 for i in 0..l {
262 let qi = qpos + i;
263 let b4 = rec.seq_nibble(qi);
264 let q = if qi < qscores.len() { qscores[qi] } else { 0 };
265 slots.push((b4, q));
266 }
267 qpos += l;
268 }
269 CIGAR_DEL => {
270 slots.extend(std::iter::repeat_n(SLOT_DEL, l));
271 }
272 CIGAR_REF_SKIP => {
273 slots.extend(std::iter::repeat_n(SLOT_SKIP, l));
274 }
275 CIGAR_INS | CIGAR_SOFT_CLIP => {
276 qpos += l;
277 }
278 _ => {}
279 }
280 }
281
282 let end = beg + slots.len() as i64;
283 ActiveRead {
284 beg,
285 end,
286 tid,
287 slots,
288 }
289 }
290
291 /// Return the slot Vec to the pool for reuse.
292 fn retire(self, pool: &mut Vec<Vec<(u8, u8)>>) {
293 if pool.len() < 128 {
294 pool.push(self.slots);
295 }
296 }
297
298 /// O(1) lookup: returns `(base4, qual, is_del, is_refskip)` at reference `pos`.
299 #[inline]
300 fn at(&self, pos: i64) -> (u8, u8, bool, bool) {
301 let off = (pos - self.beg) as usize;
302 if off >= self.slots.len() {
303 return (0, 0, true, false);
304 }
305 let (b4, q) = self.slots[off];
306 match b4 {
307 16 => (16, 0, true, false), // deletion
308 17 => (16, 0, true, true), // ref-skip
309 b => (b, q, false, false),
310 }
311 }
312}
313
314/// Write a FASTA record to `out`, wrapping at `line_len`.
315fn write_fasta(
316 out: &mut dyn Write,
317 name: &str,
318 seq: &[u8],
319 line_len: usize,
320) -> std::io::Result<()> {
321 if seq.is_empty() {
322 return Ok(());
323 }
324 writeln!(out, ">{name}")?;
325 for chunk in seq.chunks(line_len) {
326 out.write_all(chunk)?;
327 out.write_all(b"\n")?;
328 }
329 Ok(())
330}
331
332/// Compute simple-mode FASTA consensus for every covered contig.
333pub fn consensus(
334 bam_path: &Path,
335 out: &mut dyn Write,
336 opts: &ConsensusOpts,
337 workers: NonZero<usize>,
338) -> Result<ConsensusStats> {
339 let mut stats = ConsensusStats::default();
340
341 // raw::read_record bypasses the noodles record layer and reads directly from
342 // the inner BufRead via reader.get_mut(). bgzf::io::MultithreadedReader's
343 // BufRead impl interacts poorly with header-then-raw-read sequences across
344 // platforms, so we always use a single-threaded reader here.
345 let _ = workers;
346 let st = NonZero::<usize>::new(1).unwrap();
347 let mut reader = rsomics_bamio::open_with_workers(bam_path, st)?;
348 let header = reader.read_header().map_err(RsomicsError::Io)?;
349
350 let ref_names: Vec<String> = header
351 .reference_sequences()
352 .keys()
353 .map(ToString::to_string)
354 .collect();
355
356 let reader_mut = reader.get_mut();
357 // Two pre-allocated record buffers; we swap between them to avoid per-record
358 // heap allocation. `read_buf` is passed to read_record; `next_rec` holds the
359 // lookahead. When we consume next_rec into the active set, we std::mem::swap
360 // the two so read_buf reuses next_rec's allocation for the subsequent read.
361 let mut read_buf = RawRecord::default();
362 let mut next_rec: Option<RawRecord> = None;
363
364 // Active read buffer: reads whose alignment span covers the current cursor.
365 let mut active: Vec<ActiveRead> = Vec::with_capacity(512);
366 // Minimum end position among all active reads — used to skip retain() when
367 // cursor hasn't yet reached any read's end. Recomputed after retain().
368 let mut active_min_end: i64 = i64::MAX;
369 // Per-column base buffer: (base4, qual) pairs for simple_call.
370 let mut col_buf: Vec<(u8, u8)> = Vec::with_capacity(512);
371 // Free list of recycled slot Vecs — avoids per-read malloc for the slot array.
372 // Pre-filled with 64 Vecs (capacity 300 = typical 150bp read × 2 for indels)
373 // so the initial burst of reads doesn't hit the global allocator.
374 let mut slot_pool: Vec<Vec<(u8, u8)>> = (0..64).map(|_| Vec::with_capacity(300)).collect();
375
376 // Current contig state.
377 let mut cur_tid: i32 = -1;
378 let mut cur_seq: Vec<u8> = Vec::new();
379 let mut last_pos: i64 = -1;
380
381 // Pileup cursor: (tid, pos) of next column to emit.
382 let mut cursor_tid: i32 = -1;
383 let mut cursor_pos: i64 = 0;
384
385 // Advance past filtered records: read into read_buf, swap into next_rec slot.
386 // Returns true if a record was successfully placed in next_rec.
387 macro_rules! read_next {
388 () => {{
389 let n = rsomics_bamio::raw::read_record(reader_mut, &mut read_buf)?;
390 if n > 0 {
391 // Swap: read_buf gets the old next_rec's Vec allocation (reuse),
392 // next_rec gets the freshly populated read_buf contents.
393 let mut tmp = next_rec.take().unwrap_or_default();
394 std::mem::swap(&mut read_buf, &mut tmp);
395 next_rec = Some(tmp);
396 true
397 } else {
398 next_rec = None;
399 false
400 }
401 }};
402 }
403
404 // Read the first record.
405 read_next!();
406
407 loop {
408 // Feed records that start at or before cursor_pos (or prime the cursor).
409 #[allow(clippy::while_let_loop)]
410 loop {
411 let Some(ref nr) = next_rec else { break };
412 let flag = nr.flags();
413 if nr.reference_sequence_id() < 0 || flag & FLAG_UNMAP != 0 {
414 read_next!();
415 continue;
416 }
417 if opts.excl_flags != 0 && flag & opts.excl_flags != 0 {
418 read_next!();
419 continue;
420 }
421 if opts.incl_flags != 0 && flag & opts.incl_flags == 0 {
422 read_next!();
423 continue;
424 }
425 if nr.mapping_quality() < opts.min_mapq {
426 read_next!();
427 continue;
428 }
429
430 let rtid = nr.reference_sequence_id();
431 let rbeg = i64::from(nr.alignment_start());
432
433 // If cursor not yet set, initialise to this read's start.
434 if cursor_tid < 0 {
435 cursor_tid = rtid;
436 cursor_pos = rbeg;
437 }
438
439 // Read is ahead of cursor — stop feeding.
440 if rtid > cursor_tid || (rtid == cursor_tid && rbeg > cursor_pos) {
441 break;
442 }
443
444 // Consume the lookahead record into the active set, then read next.
445 let cur_nr = next_rec.take().unwrap();
446 let recycled = slot_pool.pop().unwrap_or_default();
447 let ar = ActiveRead::new(cur_nr, recycled);
448 if ar.end < active_min_end {
449 active_min_end = ar.end;
450 }
451 active.push(ar);
452
453 read_next!();
454 }
455
456 // Prune reads that ended before cursor_pos. Skip the O(n) retain when
457 // no read has ended yet (cursor hasn't reached the nearest end).
458 if cursor_pos >= active_min_end {
459 // Drain expired reads and return their slot Vecs to the pool.
460 let mut i = 0;
461 while i < active.len() {
462 if active[i].tid != cursor_tid || active[i].end <= cursor_pos {
463 let retired = active.swap_remove(i);
464 retired.retire(&mut slot_pool);
465 } else {
466 i += 1;
467 }
468 }
469 active_min_end = active.iter().map(|r| r.end).fold(i64::MAX, i64::min);
470 }
471
472 if active.is_empty() {
473 active_min_end = i64::MAX;
474 // No active reads: either done or jump to next read's position.
475 let Some(ref nr) = next_rec else {
476 // All input consumed — emit final contig.
477 break;
478 };
479 // Jump cursor to next read.
480 let rtid = nr.reference_sequence_id();
481 let rbeg = i64::from(nr.alignment_start());
482 if rtid != cursor_tid {
483 // Contig switch with no active reads — flush current contig.
484 if cur_tid >= 0 && !cur_seq.is_empty() {
485 let name = ref_names.get(cur_tid as usize).ok_or_else(|| {
486 RsomicsError::InvalidInput(format!("tid {cur_tid} not in header"))
487 })?;
488 write_fasta(out, name, &cur_seq, opts.line_len).map_err(RsomicsError::Io)?;
489 stats.sequences += 1;
490 cur_seq.clear();
491 cur_tid = -1;
492 last_pos = -1;
493 }
494 }
495 cursor_tid = rtid;
496 cursor_pos = rbeg;
497 continue;
498 }
499
500 // Emit column at (cursor_tid, cursor_pos).
501 let emit_tid = cursor_tid;
502 let emit_pos = cursor_pos;
503
504 // Handle contig switch.
505 if emit_tid != cur_tid {
506 if cur_tid >= 0 && !cur_seq.is_empty() {
507 let name = ref_names.get(cur_tid as usize).ok_or_else(|| {
508 RsomicsError::InvalidInput(format!("tid {cur_tid} not in header"))
509 })?;
510 write_fasta(out, name, &cur_seq, opts.line_len).map_err(RsomicsError::Io)?;
511 stats.sequences += 1;
512 cur_seq.clear();
513 }
514 cur_tid = emit_tid;
515 last_pos = -1;
516 }
517
518 // Collect bases from active reads.
519 col_buf.clear();
520 for ar in &mut active {
521 let (b4, q, is_del, is_refskip) = ar.at(emit_pos);
522 if is_refskip {
523 continue;
524 }
525 col_buf.push((if is_del { 16 } else { b4 }, q));
526 }
527
528 let (cb, _cq) = simple_call(&col_buf, opts);
529
530 if cb == b'*' && !opts.show_del {
531 last_pos = emit_pos;
532 cursor_pos += 1;
533 continue;
534 }
535
536 // Fill gap between last covered position and this one with N.
537 if !cur_seq.is_empty() && emit_pos > last_pos + 1 {
538 let gap = (emit_pos - last_pos - 1) as usize;
539 cur_seq.extend(std::iter::repeat_n(b'N', gap));
540 stats.positions += gap as u64;
541 }
542
543 cur_seq.push(cb);
544 stats.positions += 1;
545 last_pos = emit_pos;
546 cursor_pos += 1;
547 }
548
549 // Emit final contig.
550 if cur_tid >= 0 && !cur_seq.is_empty() {
551 let name = ref_names
552 .get(cur_tid as usize)
553 .ok_or_else(|| RsomicsError::InvalidInput(format!("tid {cur_tid} not in header")))?;
554 write_fasta(out, name, &cur_seq, opts.line_len).map_err(RsomicsError::Io)?;
555 stats.sequences += 1;
556 }
557
558 Ok(stats)
559}