use seqair_types::{BamFlags, Base, BaseQuality, Offset, Pos0, Strand, strand_from_flags};
use std::rc::Rc;
use crate::utils::TraceErr;
use super::{
cigar::{CigarMapping, CigarPosInfo},
record_store::RecordStore,
};
pub struct RefSeq {
bases: Rc<[Base]>,
start_pos: Pos0,
}
impl std::fmt::Debug for RefSeq {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RefSeq")
.field("start_pos", &self.start_pos)
.field("len", &self.bases.len())
.finish()
}
}
impl RefSeq {
pub fn new(bases: Rc<[Base]>, start_pos: Pos0) -> Self {
Self { bases, start_pos }
}
#[inline]
pub fn start_pos(&self) -> Pos0 {
self.start_pos
}
#[inline]
pub fn len(&self) -> usize {
self.bases.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.bases.is_empty()
}
pub fn base_at(&self, pos: Pos0) -> Base {
let Some(offset) = pos.as_i64().checked_sub(self.start_pos.as_i64()) else {
return Base::Unknown;
};
if offset < 0 {
return Base::Unknown;
}
#[expect(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "offset is non-negative (checked above) and bounded by reference sequence length; safe to cast on supported platforms"
)]
self.bases.get(offset as usize).copied().unwrap_or(Base::Unknown)
}
pub fn try_base_at(&self, pos: Pos0) -> Option<Base> {
let offset = pos.as_i64().checked_sub(self.start_pos.as_i64())?;
if offset < 0 {
return None;
}
#[expect(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "offset is non-negative (checked above)"
)]
self.bases.get(offset as usize).copied()
}
pub fn range(&self, pos: Pos0, len: u32) -> Option<&[Base]> {
let offset = pos.as_i64().checked_sub(self.start_pos.as_i64())?;
if offset < 0 {
return None;
}
#[expect(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "offset is non-negative (checked above)"
)]
let start = offset as usize;
let end = start.checked_add(len as usize)?;
self.bases.get(start..end)
}
}
pub struct PileupEngine<U = ()> {
store: RecordStore<U>,
current_pos: Pos0,
region_end: Pos0,
next_entry: usize,
active_end_pos: Vec<Pos0>,
active: Vec<ActiveRecord>,
max_depth: Option<u32>,
ref_seq: Option<RefSeq>,
columns_produced: u32,
max_active_depth: u32,
}
#[derive(Debug)]
struct ActiveRecord {
record_idx: u32,
cigar: CigarMapping,
flags: BamFlags,
strand: Strand,
mapq: u8,
seq_len: u32,
matching_bases: u32,
indel_bases: u32,
}
fn base_qual_at<U>(
store: &RecordStore<U>,
active: &ActiveRecord,
qpos: u32,
) -> (Base, BaseQuality) {
if active.seq_len == 0 {
return (Base::Unknown, BaseQuality::UNAVAILABLE);
}
let qual = store.qual(active.record_idx);
let q = qual.get(qpos as usize).copied().unwrap_or(BaseQuality::UNAVAILABLE);
(store.seq_at(active.record_idx, qpos as usize), q)
}
pub struct PileupColumn<'store, U = ()> {
pos: Pos0,
reference_base: Base,
alignments: Vec<PileupAlignment>,
store: &'store RecordStore<U>,
}
impl<U> std::fmt::Debug for PileupColumn<'_, U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PileupColumn")
.field("pos", &self.pos)
.field("reference_base", &self.reference_base)
.field("depth", &self.alignments.len())
.finish_non_exhaustive()
}
}
impl<'store, U> PileupColumn<'store, U> {
#[must_use]
pub fn pos(&self) -> Pos0 {
self.pos
}
#[must_use]
pub fn depth(&self) -> usize {
self.alignments.len()
}
pub fn alignments(&self) -> impl Iterator<Item = AlignmentView<'_, 'store, U>> + '_ {
let store = self.store;
self.alignments.iter().map(move |aln| AlignmentView { aln, store })
}
pub fn raw_alignments(&self) -> impl Iterator<Item = &PileupAlignment> + '_ {
self.alignments.iter()
}
pub fn reference_base(&self) -> Base {
self.reference_base
}
pub fn store(&self) -> &'store RecordStore<U> {
self.store
}
#[must_use]
pub fn match_depth(&self) -> usize {
self.alignments.iter().filter(|a| a.qpos().is_some()).count()
}
}
pub struct AlignmentView<'a, 'store, U> {
aln: &'a PileupAlignment,
store: &'store RecordStore<U>,
}
impl<U> std::fmt::Debug for AlignmentView<'_, '_, U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AlignmentView").field("aln", &self.aln).finish_non_exhaustive()
}
}
impl<'a, 'store, U> AlignmentView<'a, 'store, U> {
pub fn extra(&self) -> &'store U {
self.store.extra(self.aln.record_idx())
}
pub fn qname(&self) -> &'store [u8] {
self.store.qname(self.aln.record_idx())
}
pub fn aux(&self) -> &'store [u8] {
self.store.aux(self.aln.record_idx())
}
pub fn alignment(&self) -> &'a PileupAlignment {
self.aln
}
pub fn store(&self) -> &'store RecordStore<U> {
self.store
}
}
impl<U> std::ops::Deref for AlignmentView<'_, '_, U> {
type Target = PileupAlignment;
fn deref(&self) -> &PileupAlignment {
self.aln
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PileupOp {
Match { qpos: u32, base: Base, qual: BaseQuality },
Insertion { qpos: u32, base: Base, qual: BaseQuality, insert_len: u32 },
Deletion { del_len: u32 },
ComplexIndel { del_len: u32, insert_len: u32, is_refskip: bool },
RefSkip,
}
const _: () = assert!(std::mem::size_of::<PileupOp>() <= 12, "PileupOp grew unexpectedly large");
#[derive(Clone, Debug)]
pub struct PileupAlignment {
record_idx: u32,
pub op: PileupOp,
pub mapq: u8,
pub flags: BamFlags,
pub strand: Strand,
pub seq_len: u32,
pub matching_bases: u32,
pub indel_bases: u32,
}
impl PileupAlignment {
#[must_use]
pub fn record_idx(&self) -> u32 {
self.record_idx
}
#[must_use]
pub fn op(&self) -> &PileupOp {
&self.op
}
#[must_use]
pub fn qpos(&self) -> Option<usize> {
match self.op {
PileupOp::Match { qpos, .. } | PileupOp::Insertion { qpos, .. } => Some(qpos as usize),
PileupOp::Deletion { .. } | PileupOp::ComplexIndel { .. } | PileupOp::RefSkip => None,
}
}
#[must_use]
pub fn base(&self) -> Option<Base> {
match self.op {
PileupOp::Match { base, .. } | PileupOp::Insertion { base, .. } => Some(base),
PileupOp::Deletion { .. } | PileupOp::ComplexIndel { .. } | PileupOp::RefSkip => None,
}
}
#[must_use]
pub fn qual(&self) -> Option<BaseQuality> {
match self.op {
PileupOp::Match { qual, .. } | PileupOp::Insertion { qual, .. } => Some(qual),
PileupOp::Deletion { .. } | PileupOp::ComplexIndel { .. } | PileupOp::RefSkip => None,
}
}
#[must_use]
pub fn is_del(&self) -> bool {
matches!(self.op, PileupOp::Deletion { .. } | PileupOp::ComplexIndel { .. })
}
#[must_use]
pub fn is_refskip(&self) -> bool {
matches!(self.op, PileupOp::RefSkip | PileupOp::ComplexIndel { is_refskip: true, .. })
}
#[must_use]
pub fn insert_len(&self) -> u32 {
match self.op {
PileupOp::Insertion { insert_len, .. } | PileupOp::ComplexIndel { insert_len, .. } => {
insert_len
}
_ => 0,
}
}
#[must_use]
pub fn del_len(&self) -> u32 {
match self.op {
PileupOp::Deletion { del_len } | PileupOp::ComplexIndel { del_len, .. } => del_len,
_ => 0,
}
}
}
impl<U> PileupEngine<U> {
pub fn new(store: RecordStore<U>, region_start: Pos0, region_end: Pos0) -> Self {
PileupEngine {
store,
current_pos: region_start,
region_end,
next_entry: 0,
active_end_pos: Vec::new(),
active: Vec::new(),
max_depth: None,
ref_seq: None,
columns_produced: 0,
max_active_depth: 0,
}
}
pub fn set_reference_seq(&mut self, ref_seq: RefSeq) {
self.ref_seq = Some(ref_seq);
}
pub fn set_max_depth(&mut self, max: u32) {
self.max_depth = Some(max);
}
pub fn remaining_positions(&self) -> usize {
let diff = self
.region_end
.as_i64()
.checked_sub(self.current_pos.as_i64())
.and_then(|d| d.checked_add(1))
.unwrap_or(0);
#[expect(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "diff.max(0) is non-negative; bounded by region size which fits in usize on supported platforms"
)]
let r = diff.max(0) as usize;
r
}
pub fn store(&self) -> &RecordStore<U> {
&self.store
}
pub fn take_store(&mut self) -> Option<RecordStore<U>> {
if self.store.is_empty() && self.store.records_capacity() == 0 {
return None;
}
let mut store = self.store.take_contents();
store.clear();
Some(store)
}
}
impl<U> std::fmt::Debug for PileupEngine<U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PileupEngine")
.field("current_pos", &self.current_pos)
.field("region_end", &self.region_end)
.field("next_entry", &self.next_entry)
.field("active_count", &self.active.len())
.field("max_depth", &self.max_depth)
.field("columns_produced", &self.columns_produced)
.finish_non_exhaustive()
}
}
pub struct PileupGuard<'a, U = ()> {
engine: Option<PileupEngine<U>>,
slot: &'a mut RecordStore<U>,
}
impl<'a, U> PileupGuard<'a, U> {
pub(crate) fn new(engine: PileupEngine<U>, slot: &'a mut RecordStore<U>) -> Self {
Self { engine: Some(engine), slot }
}
pub fn into_inner(mut self) -> PileupEngine<U> {
self.engine.take().expect("engine present until into_inner is called")
}
fn engine_ref(&self) -> &PileupEngine<U> {
self.engine.as_ref().expect("engine present until guard is dropped or into_inner is called")
}
fn engine_mut(&mut self) -> &mut PileupEngine<U> {
self.engine.as_mut().expect("engine present until guard is dropped or into_inner is called")
}
}
impl<U> std::ops::Deref for PileupGuard<'_, U> {
type Target = PileupEngine<U>;
fn deref(&self) -> &PileupEngine<U> {
self.engine_ref()
}
}
impl<U> std::ops::DerefMut for PileupGuard<'_, U> {
fn deref_mut(&mut self) -> &mut PileupEngine<U> {
self.engine_mut()
}
}
impl<U> std::fmt::Debug for PileupGuard<'_, U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PileupGuard").field("engine", &self.engine).finish_non_exhaustive()
}
}
impl<U> Drop for PileupGuard<'_, U> {
fn drop(&mut self) {
if let Some(mut engine) = self.engine.take()
&& let Some(store) = engine.take_store()
{
*self.slot = store;
}
}
}
impl<U> PileupEngine<U> {
fn advance(&mut self) -> Option<(Pos0, Base, Vec<PileupAlignment>)> {
loop {
if self.current_pos > self.region_end {
return None;
}
let pos = self.current_pos;
#[allow(
clippy::unwrap_in_result,
reason = "infallible: current_pos <= region_end < u32::MAX - 1"
)]
{
self.current_pos = self
.current_pos
.checked_add_offset(Offset::new(1))
.trace_err("BUG: current_pos + 1 overflowed despite being <= region_end")?;
}
{
let mut i = 0;
while i < self.active_end_pos.len() {
debug_assert!(i < self.active.len());
#[allow(
clippy::indexing_slicing,
reason = "i < active_end_pos.len() == active.len()"
)]
if self.active_end_pos[i] < pos {
self.active_end_pos.swap_remove(i);
self.active.swap_remove(i);
} else {
i = i.checked_add(1).trace_err("active set size exceeded usize::MAX")?;
}
}
}
while self.next_entry < self.store.len() {
#[expect(
clippy::cast_possible_truncation,
reason = "RecordStore capacity is bounded by SlabOverflow (u32); debug_assert enforces invariant"
)]
let idx = self.next_entry as u32;
debug_assert_eq!(idx as usize, self.next_entry, "next_entry exceeds u32::MAX");
let rec = self.store.record(idx);
if rec.pos > pos {
break;
}
self.next_entry =
self.next_entry.checked_add(1).trace_err("next_entry exceeded usize::MAX")?;
if rec.end_pos < pos {
continue;
}
if rec.flags.is_unmapped() {
continue;
}
let cigar = CigarMapping::new(rec.pos, self.store.cigar(idx))
.trace_err("failed to generate cigar mapping")?;
self.active_end_pos.push(rec.end_pos);
self.active.push(ActiveRecord {
record_idx: idx,
cigar,
flags: rec.flags,
strand: strand_from_flags(rec.flags),
mapq: rec.mapq,
seq_len: rec.seq_len,
matching_bases: rec.matching_bases,
indel_bases: rec.indel_bases,
});
}
if self.active.is_empty() {
if self.next_entry >= self.store.len() {
return None;
}
#[expect(
clippy::cast_possible_truncation,
reason = "RecordStore capacity is bounded by SlabOverflow (u32); debug_assert enforces invariant"
)]
let next_entry_u32 = self.next_entry as u32;
debug_assert_eq!(
next_entry_u32 as usize, self.next_entry,
"next_entry exceeds u32::MAX"
);
let next_pos = self.store.record(next_entry_u32).pos;
if next_pos > self.current_pos {
self.current_pos = next_pos;
}
continue;
}
let mut alignments = Vec::with_capacity(self.active.len());
for active in &self.active {
let Some(info) = active.cigar.pos_info_at(pos) else { continue };
let op = match info {
CigarPosInfo::Match { qpos } => {
let (base, qual) = base_qual_at(&self.store, active, qpos);
PileupOp::Match { qpos, base, qual }
}
CigarPosInfo::Insertion { qpos, insert_len } => {
let (base, qual) = base_qual_at(&self.store, active, qpos);
PileupOp::Insertion { qpos, base, qual, insert_len }
}
CigarPosInfo::Deletion { del_len } => PileupOp::Deletion { del_len },
CigarPosInfo::ComplexIndel { del_len, insert_len, is_refskip } => {
PileupOp::ComplexIndel { del_len, insert_len, is_refskip }
}
CigarPosInfo::RefSkip => PileupOp::RefSkip,
};
alignments.push(PileupAlignment {
op,
mapq: active.mapq,
flags: active.flags,
strand: active.strand,
seq_len: active.seq_len,
matching_bases: active.matching_bases,
indel_bases: active.indel_bases,
record_idx: active.record_idx,
});
}
if let Some(max) = self.max_depth {
alignments.truncate(max as usize);
}
if !alignments.is_empty() {
self.columns_produced = self.columns_produced.saturating_add(1);
#[expect(
clippy::cast_possible_truncation,
reason = "depth is bounded by max_depth (u32) or typical pileup sizes; saturates at u32::MAX for profiling"
)]
let depth_u32 = alignments.len() as u32;
self.max_active_depth = self.max_active_depth.max(depth_u32);
let reference_base =
self.ref_seq.as_ref().map_or(Base::Unknown, |r| r.base_at(pos));
return Some((pos, reference_base, alignments));
}
}
}
pub fn pileups(&mut self) -> Option<PileupColumn<'_, U>> {
let (pos, reference_base, alignments) = self.advance()?;
Some(PileupColumn { pos, reference_base, alignments, store: &self.store })
}
pub fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(self.remaining_positions()))
}
}
impl<U> Drop for PileupEngine<U> {
fn drop(&mut self) {
if self.columns_produced > 0 {
tracing::debug!(
target: super::region_buf::PROFILE_TARGET,
columns = self.columns_produced,
max_depth = self.max_active_depth,
active_cap = self.active.capacity().saturating_add(self.active_end_pos.capacity()),
"pileup_engine",
);
}
}
}
#[cfg(test)]
#[allow(clippy::arithmetic_side_effects, reason = "test arithmetic on known small values")]
mod tests {
use super::*;
#[test]
fn ref_seq_base_at_within_range() {
let ref_seq =
RefSeq::new(Rc::from([Base::A, Base::C, Base::G, Base::T]), Pos0::new(100).unwrap());
assert_eq!(ref_seq.base_at(Pos0::new(100).unwrap()), Base::A);
assert_eq!(ref_seq.base_at(Pos0::new(101).unwrap()), Base::C);
assert_eq!(ref_seq.base_at(Pos0::new(102).unwrap()), Base::G);
assert_eq!(ref_seq.base_at(Pos0::new(103).unwrap()), Base::T);
}
#[test]
fn ref_seq_base_at_before_start() {
let ref_seq = RefSeq::new(Rc::from([Base::A, Base::C]), Pos0::new(100).unwrap());
assert_eq!(ref_seq.base_at(Pos0::new(99).unwrap()), Base::Unknown);
assert_eq!(ref_seq.base_at(Pos0::new(0).unwrap()), Base::Unknown);
}
#[test]
fn ref_seq_base_at_after_end() {
let ref_seq = RefSeq::new(Rc::from([Base::A, Base::C]), Pos0::new(100).unwrap());
assert_eq!(ref_seq.base_at(Pos0::new(102).unwrap()), Base::Unknown);
assert_eq!(ref_seq.base_at(Pos0::new(1000).unwrap()), Base::Unknown);
}
#[test]
fn ref_seq_base_at_empty() {
let ref_seq = RefSeq::new(Rc::from([]), Pos0::new(100).unwrap());
assert_eq!(ref_seq.base_at(Pos0::new(100).unwrap()), Base::Unknown);
}
}