use super::{
ReaderError,
indexed::IndexedReader,
segment::{IntoSegmentTarget, Segment, SegmentOptions, Segments},
};
use crate::{
bam::{
BamHeader,
pileup::{PileupEngine, PileupGuard, RefSeq},
record_store::{CustomizeRecordStore, RecordStore},
},
fasta::{FastaError, IndexedFastaReader},
};
use seqair_types::{Base, Pos0};
use std::path::Path;
use std::rc::Rc;
use tracing::instrument;
pub struct Readers<E: CustomizeRecordStore = ()> {
pub(crate) alignment: IndexedReader,
pub(crate) fasta: IndexedFastaReader,
pub(crate) store: RecordStore<E::Extra>,
pub(crate) fasta_buf: Vec<u8>,
pub(crate) customize: E,
}
impl<E: CustomizeRecordStore> std::fmt::Debug for Readers<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Readers").field("alignment", &self.alignment).finish()
}
}
impl Readers<()> {
#[instrument(level = "debug", fields(alignment = %alignment_path.display(), fasta = %fasta_path.display()))]
pub fn open(alignment_path: &Path, fasta_path: &Path) -> Result<Self, ReaderError> {
Self::open_customized(alignment_path, fasta_path, ())
}
}
impl<E: CustomizeRecordStore> Readers<E> {
pub fn open_customized(
alignment_path: &Path,
fasta_path: &Path,
customize: E,
) -> Result<Self, ReaderError> {
let fasta = IndexedFastaReader::open(fasta_path)
.map_err(|source| ReaderError::FastaOpen { source })?;
let alignment = IndexedReader::open_with_fasta(alignment_path, fasta_path)?;
Ok(Readers {
alignment,
fasta,
store: RecordStore::default(),
fasta_buf: Vec::new(),
customize,
})
}
pub fn fork(&self) -> Result<Self, ReaderError> {
let alignment = self.alignment.fork()?;
let fasta = self.fasta.fork().map_err(|source| ReaderError::FastaFork { source })?;
Ok(Readers {
alignment,
fasta,
store: RecordStore::default(),
fasta_buf: Vec::new(),
customize: self.customize.clone(),
})
}
pub fn header(&self) -> &BamHeader {
self.alignment.header()
}
pub fn fetch_into(
&mut self,
tid: u32,
start: Pos0,
end: Pos0,
store: &mut RecordStore<()>,
) -> Result<usize, ReaderError> {
self.alignment.fetch_into(tid, start, end, store)
}
pub fn customize(&self) -> &E {
&self.customize
}
pub fn customize_mut(&mut self) -> &mut E {
&mut self.customize
}
pub fn segments(
&self,
target: impl IntoSegmentTarget,
opts: SegmentOptions,
) -> Result<Segments, ReaderError> {
let ranges = super::segment::resolve_target(target, self.header())?;
Ok(Segments::new(ranges, opts))
}
pub fn pileup(&mut self, segment: &Segment) -> Result<PileupGuard<'_, E::Extra>, ReaderError> {
let tid = segment.tid();
let start = segment.start();
let end = segment.end();
let header = self.alignment.header();
match header.tid(segment.contig().as_str()) {
Some(t) if t == tid.as_u32() => {}
_ => {
return Err(ReaderError::SegmentHeaderMismatch {
contig: segment.contig().clone(),
expected_tid: tid.as_u32(),
});
}
}
let header_last_pos = header.target_len(tid.as_u32()).and_then(|len| len.checked_sub(1));
if header_last_pos != Some(segment.contig_last_pos().as_u64()) {
return Err(ReaderError::SegmentContigLengthMismatch {
contig: segment.contig().clone(),
segment_last_pos: segment.contig_last_pos().as_u64(),
header_last_pos,
});
}
let alignment = &mut self.alignment;
let store = &mut self.store;
let customize = &mut self.customize;
alignment.fetch_into_customized(tid.as_u32(), start, end, store, customize)?;
let contig_name = segment.contig();
let stop_u64 = end.as_u64().saturating_add(1);
self.fasta
.fetch_seq_into_u64(contig_name, start.as_u64(), stop_u64, &mut self.fasta_buf)
.map_err(|source| ReaderError::FastaFetch {
contig: contig_name.clone(),
start: start.as_u64(),
end: end.as_u64(),
source,
})?;
let bases: &[Base] = Base::convert_ascii_in_place_as_slice(&mut self.fasta_buf);
let ref_seq = RefSeq::new(Rc::from(bases), start);
let store = std::mem::take(&mut self.store);
let mut engine = PileupEngine::new(store, start, end);
engine.set_reference_seq(ref_seq);
Ok(PileupGuard::new(engine, &mut self.store))
}
pub fn fasta(&self) -> &IndexedFastaReader {
&self.fasta
}
pub fn fasta_mut(&mut self) -> &mut IndexedFastaReader {
&mut self.fasta
}
pub fn fetch_base_seq(
&mut self,
name: &str,
start: Pos0,
stop: Pos0,
) -> Result<Rc<[Base]>, FastaError> {
self.fasta.fetch_seq_into(name, start, stop, &mut self.fasta_buf)?;
let bases: &[Base] = Base::convert_ascii_in_place_as_slice(&mut self.fasta_buf);
Ok(Rc::from(bases))
}
pub fn alignment(&self) -> &IndexedReader {
&self.alignment
}
pub fn alignment_mut(&mut self) -> &mut IndexedReader {
&mut self.alignment
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, reason = "test code")]
mod tests {
use super::*;
use crate::reader::segment::Segment;
use seqair_types::{Pos0, SmolStr};
fn test_bam_path() -> &'static Path {
Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data/test.bam"))
}
fn test_fasta_path() -> &'static Path {
Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data/test.fasta.gz"))
}
#[test]
fn fetch_base_seq_retains_buffer_capacity() {
let mut readers = Readers::open(test_bam_path(), test_fasta_path()).unwrap();
let start = Pos0::new(6_100_000).unwrap();
let stop = Pos0::new(6_101_000).unwrap();
let _first = readers.fetch_base_seq("chr19", start, stop).unwrap();
let cap_after_first = readers.fasta_buf.capacity();
assert!(cap_after_first > 0, "buffer should have grown to hold the fetched region");
let _second = readers.fetch_base_seq("chr19", start, stop).unwrap();
assert_eq!(
cap_after_first,
readers.fasta_buf.capacity(),
"second fetch_base_seq must reuse the existing buffer allocation"
);
}
#[test]
fn pileup_guard_recovers_store_on_drop_even_with_early_break() {
use std::num::NonZeroU32;
let mut readers = Readers::open(test_bam_path(), test_fasta_path()).unwrap();
let opts = SegmentOptions::new(NonZeroU32::new(3_000).unwrap());
let segments: Vec<_> = readers
.segments(("chr19", Pos0::new(6_103_500).unwrap(), Pos0::new(6_106_500).unwrap()), opts)
.unwrap()
.collect();
assert!(!segments.is_empty(), "test BAM should yield at least one segment");
{
let mut p = readers.pileup(&segments[0]).unwrap();
while p.pileups().is_some() {}
}
let cap_after_full = readers.store.records_capacity();
assert!(cap_after_full > 0, "guard's Drop must recover the store with non-zero capacity");
{
let mut p = readers.pileup(&segments[0]).unwrap();
#[allow(clippy::never_loop, reason = "intentional early break to exercise guard Drop")]
while let Some(_col) = p.pileups() {
break;
}
}
let cap_after_early_break = readers.store.records_capacity();
assert!(
cap_after_early_break > 0,
"guard's Drop must recover the store even when the loop is broken early"
);
assert_eq!(
cap_after_early_break, cap_after_full,
"early-break recovery should retain the same capacity as full-drain recovery"
);
}
#[test]
fn pileup_guard_take_store_via_deref_disables_recovery() {
use std::num::NonZeroU32;
let mut readers = Readers::open(test_bam_path(), test_fasta_path()).unwrap();
let opts = SegmentOptions::new(NonZeroU32::new(3_000).unwrap());
let segments: Vec<_> = readers
.segments(("chr19", Pos0::new(6_103_500).unwrap(), Pos0::new(6_106_500).unwrap()), opts)
.unwrap()
.collect();
assert!(!segments.is_empty());
{
let mut p = readers.pileup(&segments[0]).unwrap();
while p.pileups().is_some() {}
let drained = p.take_store().expect("populated store available for the first take");
assert!(drained.records_capacity() > 0);
}
assert_eq!(
readers.store.records_capacity(),
0,
"after the user drains via Deref, the slot is left as an empty Default — \
the next pileup() call allocates fresh. This is the documented contract."
);
}
#[test]
fn pileup_rejects_segment_with_mismatched_contig_name() {
let mut readers = Readers::open(test_bam_path(), test_fasta_path()).unwrap();
let chr19_tid = readers.header().tid("chr19").expect("test BAM has chr19");
let real_tid =
crate::reader::ResolveTid::resolve_tid(&chr19_tid, readers.header()).unwrap();
let bogus_contig: SmolStr = "chr_does_not_exist".into();
let last_pos =
Pos0::new(u32::try_from(readers.header().target_len(chr19_tid).unwrap() - 1).unwrap())
.unwrap();
let segment = Segment::new(
real_tid,
bogus_contig.clone(),
Pos0::new(0).unwrap(),
Pos0::new(99).unwrap(),
0,
0,
last_pos,
)
.unwrap();
let err = readers.pileup(&segment).unwrap_err();
match err {
ReaderError::SegmentHeaderMismatch { contig, expected_tid } => {
assert_eq!(contig, bogus_contig);
assert_eq!(expected_tid, real_tid.as_u32());
}
other => panic!("expected SegmentHeaderMismatch, got {other:?}"),
}
}
#[test]
fn pileup_rejects_segment_with_mismatched_contig_length() {
let mut readers = Readers::open(test_bam_path(), test_fasta_path()).unwrap();
let chr19_tid = readers.header().tid("chr19").expect("test BAM has chr19");
let real_tid =
crate::reader::ResolveTid::resolve_tid(&chr19_tid, readers.header()).unwrap();
let actual_last_pos =
Pos0::new(u32::try_from(readers.header().target_len(chr19_tid).unwrap() - 1).unwrap())
.unwrap();
let bogus_last_pos = Pos0::new(u32::from(actual_last_pos) / 2).unwrap();
let segment = Segment::new(
real_tid,
"chr19".into(),
Pos0::new(0).unwrap(),
Pos0::new(99).unwrap(),
0,
0,
bogus_last_pos,
)
.unwrap();
let err = readers.pileup(&segment).unwrap_err();
match err {
ReaderError::SegmentContigLengthMismatch {
contig,
segment_last_pos,
header_last_pos,
} => {
assert_eq!(contig.as_str(), "chr19");
assert_eq!(segment_last_pos, bogus_last_pos.as_u64());
assert_eq!(header_last_pos, Some(actual_last_pos.as_u64()));
}
other => panic!("expected SegmentContigLengthMismatch, got {other:?}"),
}
}
#[test]
fn pileup_rejects_segment_with_mismatched_tid() {
let mut readers = Readers::open(test_bam_path(), test_fasta_path()).unwrap();
let chr19_tid = readers.header().tid("chr19").expect("test BAM has chr19");
let other_tid: u32 = (0..u32::try_from(readers.header().target_count()).unwrap())
.find(|t| *t != chr19_tid)
.expect("test BAM has more than one contig");
let other_name: SmolStr = readers.header().target_name(other_tid).unwrap().into();
let chr19_real_tid =
crate::reader::ResolveTid::resolve_tid(&chr19_tid, readers.header()).unwrap();
let last_pos =
Pos0::new(u32::try_from(readers.header().target_len(chr19_tid).unwrap() - 1).unwrap())
.unwrap();
let segment = Segment::new(
chr19_real_tid,
other_name,
Pos0::new(0).unwrap(),
Pos0::new(99).unwrap(),
0,
0,
last_pos,
)
.unwrap();
let err = readers.pileup(&segment).unwrap_err();
assert!(
matches!(err, ReaderError::SegmentHeaderMismatch { .. }),
"expected SegmentHeaderMismatch, got {err:?}"
);
}
}