use std::fs::File;
use std::io::{BufReader, Cursor, Read};
use std::path::{Path, PathBuf};
use anyhow::{Context, Result, bail};
use flate2::read::MultiGzDecoder;
use noodles::core::Region;
use noodles::sam::Header;
use noodles::{bam, sam};
use rust_htslib::bam::Read as HtsRead;
use super::riker_record::{
AuxTagRequirements, BamRec, FallbackRec, HtslibRec, RikerRecord, RikerRecordRequirements,
};
const BAM_READ_BUFFER_BYTES: usize = 256 * 1024;
pub struct AlignmentReader {
inner: Inner,
header: Header,
}
enum Inner {
Sam(sam::io::Reader<BufReader<File>>),
GzippedSam(Box<sam::io::Reader<BufReader<MultiGzDecoder<File>>>>),
Bam(bam::io::Reader<noodles_bgzf::io::Reader<BufReader<File>>>),
Cram(Box<rust_htslib::bam::Reader>),
}
impl AlignmentReader {
pub fn open(path: &Path, reference: Option<&Path>) -> Result<Self> {
match detect_format(path)? {
AlignmentFormat::Sam => {
let file = File::open(path).with_context(|| open_context(path))?;
let mut reader = sam::io::Reader::new(BufReader::new(file));
let header = reader.read_header().with_context(|| header_context(path))?;
Ok(Self { inner: Inner::Sam(reader), header })
}
AlignmentFormat::GzippedSam => {
let file = File::open(path).with_context(|| open_context(path))?;
let mut reader = sam::io::Reader::new(BufReader::new(MultiGzDecoder::new(file)));
let header = reader.read_header().with_context(|| header_context(path))?;
Ok(Self { inner: Inner::GzippedSam(Box::new(reader)), header })
}
AlignmentFormat::Bam => {
let file = File::open(path).with_context(|| open_context(path))?;
let buf = BufReader::with_capacity(BAM_READ_BUFFER_BYTES, file);
let bgzf_reader = noodles_bgzf::io::Reader::new(buf);
let mut reader = bam::io::Reader::from(bgzf_reader);
let header = reader.read_header().with_context(|| header_context(path))?;
Ok(Self { inner: Inner::Bam(reader), header })
}
AlignmentFormat::Cram => match reference {
Some(ref_path) => {
let mut reader = rust_htslib::bam::Reader::from_path(path)
.with_context(|| open_context(path))?;
reader.set_reference(ref_path).with_context(|| {
format!(
"Failed to set CRAM reference {} on {}",
ref_path.display(),
path.display()
)
})?;
let header = parse_htslib_header(reader.header().as_bytes())
.with_context(|| header_context(path))?;
Ok(Self { inner: Inner::Cram(Box::new(reader)), header })
}
None => {
bail!("CRAM files require a reference FASTA (--reference): {}", path.display())
}
},
}
}
#[must_use]
pub fn header(&self) -> &Header {
&self.header
}
#[must_use]
pub fn empty_record(&self) -> RikerRecord {
match self.inner {
Inner::Bam(_) => RikerRecord::Bam(BamRec::new()),
Inner::Cram(_) => RikerRecord::Htslib(HtslibRec::new()),
Inner::Sam(_) | Inner::GzippedSam(_) => RikerRecord::Fallback(FallbackRec::empty()),
}
}
pub fn fill_record(
&mut self,
requirements: &RikerRecordRequirements,
slot: &mut RikerRecord,
) -> Result<bool> {
match (&mut self.inner, slot) {
(Inner::Bam(reader), RikerRecord::Bam(bam_rec)) => {
fill_bam_slot(reader, bam_rec, requirements)
}
(Inner::Sam(reader), RikerRecord::Fallback(fb)) => {
fill_sam_slot(reader, &self.header, fb)
}
(Inner::GzippedSam(reader), RikerRecord::Fallback(fb)) => {
fill_sam_slot(reader, &self.header, fb)
}
(Inner::Cram(reader), RikerRecord::Htslib(hts_rec)) => {
fill_htslib_slot(reader.as_mut(), hts_rec, requirements)
}
(Inner::Bam(_), _) => bail!("BAM reader requires a RikerRecord::Bam slot"),
(Inner::Cram(_), _) => bail!("CRAM reader requires a RikerRecord::Htslib slot"),
(Inner::Sam(_) | Inner::GzippedSam(_), _) => {
bail!("SAM reader requires a RikerRecord::Fallback slot")
}
}
}
pub fn riker_records<'a>(
&'a mut self,
requirements: &'a RikerRecordRequirements,
) -> Box<dyn Iterator<Item = Result<RikerRecord>> + 'a> {
let header = &self.header;
match &mut self.inner {
Inner::Bam(reader) => Box::new(BamRikerRecordIter { reader, requirements }),
Inner::Sam(reader) => Box::new(reader.record_bufs(header).map(|result| {
let buf = result.context("Failed to read SAM record")?;
Ok(RikerRecord::Fallback(FallbackRec::from_record_buf(buf)))
})),
Inner::GzippedSam(reader) => Box::new(reader.record_bufs(header).map(|result| {
let buf = result.context("Failed to read SAM record")?;
Ok(RikerRecord::Fallback(FallbackRec::from_record_buf(buf)))
})),
Inner::Cram(reader) => {
Box::new(HtslibRikerRecordIter { reader: reader.as_mut(), requirements })
}
}
}
}
pub struct IndexedAlignmentReader {
inner: IndexedInner,
header: Header,
}
enum IndexedInner {
Bam(bam::io::IndexedReader<noodles_bgzf::io::Reader<File>>),
Cram(Box<rust_htslib::bam::IndexedReader>),
}
impl IndexedAlignmentReader {
pub fn open(path: &Path, reference: Option<&Path>) -> Result<Self> {
match detect_format(path)? {
AlignmentFormat::Sam | AlignmentFormat::GzippedSam => {
bail!("SAM files cannot be indexed; use a BAM or CRAM file: {}", path.display())
}
AlignmentFormat::Bam => {
validate_bam_index_exists(path)?;
let mut reader = bam::io::indexed_reader::Builder::default()
.build_from_path(path)
.with_context(|| format!("Failed to open indexed BAM: {}", path.display()))?;
let header = reader.read_header().with_context(|| header_context(path))?;
Ok(Self { inner: IndexedInner::Bam(reader), header })
}
AlignmentFormat::Cram => match reference {
Some(ref_path) => {
validate_cram_index_exists(path)?;
let mut reader = rust_htslib::bam::IndexedReader::from_path(path)
.with_context(|| {
format!("Failed to open indexed CRAM: {}", path.display())
})?;
reader.set_reference(ref_path).with_context(|| {
format!(
"Failed to set CRAM reference {} on {}",
ref_path.display(),
path.display()
)
})?;
let header = parse_htslib_header(reader.header().as_bytes())
.with_context(|| header_context(path))?;
Ok(Self { inner: IndexedInner::Cram(Box::new(reader)), header })
}
None => {
bail!("CRAM files require a reference FASTA (--reference): {}", path.display())
}
},
}
}
#[must_use]
pub fn header(&self) -> &Header {
&self.header
}
pub fn query_for_each<F>(
&mut self,
region: &Region,
requirements: &RikerRecordRequirements,
mut f: F,
) -> Result<()>
where
F: FnMut(&RikerRecord) -> Result<()>,
{
let header = &self.header;
match &mut self.inner {
IndexedInner::Bam(reader) => {
let query = reader
.query(header, region)
.with_context(|| format!("Failed to query BAM for region: {region}"))?;
let mut record = RikerRecord::Bam(BamRec::new());
for result in query.records() {
let raw = result.context("Failed to read BAM record during query")?;
let RikerRecord::Bam(ref mut bam_rec) = record else {
unreachable!("scratch was constructed as RikerRecord::Bam");
};
bam_rec.install(raw)?;
apply_requirements(bam_rec, requirements)?;
f(&record)?;
}
}
IndexedInner::Cram(reader) => {
fetch_htslib_region(reader.as_mut(), region)?;
let mut record = RikerRecord::Htslib(HtslibRec::new());
loop {
let RikerRecord::Htslib(ref mut hts_rec) = record else {
unreachable!("scratch was constructed as RikerRecord::Htslib");
};
if !hts_rec.read_from(reader.as_mut())? {
break;
}
apply_requirements_htslib(hts_rec, requirements)?;
f(&record)?;
}
}
}
Ok(())
}
}
struct BamRikerRecordIter<'a, R: Read> {
reader: &'a mut bam::io::Reader<R>,
requirements: &'a RikerRecordRequirements,
}
impl<R: Read> Iterator for BamRikerRecordIter<'_, R> {
type Item = Result<RikerRecord>;
fn next(&mut self) -> Option<Self::Item> {
let mut bam_rec = BamRec::new();
match bam_rec.read_from(self.reader) {
Ok(0) => None,
Ok(_) => match apply_requirements(&mut bam_rec, self.requirements) {
Ok(()) => Some(Ok(RikerRecord::Bam(bam_rec))),
Err(e) => Some(Err(e)),
},
Err(e) => Some(Err(e)),
}
}
}
struct HtslibRikerRecordIter<'a> {
reader: &'a mut rust_htslib::bam::Reader,
requirements: &'a RikerRecordRequirements,
}
impl Iterator for HtslibRikerRecordIter<'_> {
type Item = Result<RikerRecord>;
fn next(&mut self) -> Option<Self::Item> {
let mut hts_rec = HtslibRec::new();
match hts_rec.read_from(self.reader) {
Ok(false) => None,
Ok(true) => match apply_requirements_htslib(&mut hts_rec, self.requirements) {
Ok(()) => Some(Ok(RikerRecord::Htslib(hts_rec))),
Err(e) => Some(Err(e)),
},
Err(e) => Some(Err(e)),
}
}
}
fn fill_bam_slot<R: Read>(
reader: &mut bam::io::Reader<R>,
bam_rec: &mut BamRec,
requirements: &RikerRecordRequirements,
) -> Result<bool> {
let n = bam_rec.read_from(reader)?;
if n == 0 {
return Ok(false);
}
apply_requirements(bam_rec, requirements)?;
Ok(true)
}
fn fill_sam_slot<R: std::io::BufRead>(
reader: &mut sam::io::Reader<R>,
header: &Header,
fb: &mut FallbackRec,
) -> Result<bool> {
match reader.read_record_buf(header, fb.inner_mut()) {
Ok(0) => Ok(false),
Ok(_) => {
fb.refresh_cache();
Ok(true)
}
Err(e) => {
fb.refresh_cache();
Err(anyhow::Error::from(e).context("Failed to read SAM record"))
}
}
}
fn apply_requirements(bam_rec: &mut BamRec, requirements: &RikerRecordRequirements) -> Result<()> {
if requirements.needs_sequence() {
bam_rec.decode_sequence();
}
match requirements.aux_tags() {
AuxTagRequirements::None => {}
AuxTagRequirements::Specific { values, presence } => {
bam_rec.scan_aux_tags(values, presence)?;
}
AuxTagRequirements::All => {
bam_rec.decode_all_aux()?;
}
}
Ok(())
}
fn fill_htslib_slot<R: HtsRead>(
reader: &mut R,
hts_rec: &mut HtslibRec,
requirements: &RikerRecordRequirements,
) -> Result<bool> {
if !hts_rec.read_from(reader)? {
return Ok(false);
}
apply_requirements_htslib(hts_rec, requirements)?;
Ok(true)
}
fn apply_requirements_htslib(
hts_rec: &mut HtslibRec,
requirements: &RikerRecordRequirements,
) -> Result<()> {
if requirements.needs_sequence() {
hts_rec.decode_sequence();
}
match requirements.aux_tags() {
AuxTagRequirements::None => {}
AuxTagRequirements::Specific { values, presence } => {
hts_rec.scan_aux_tags(values, presence)?;
}
AuxTagRequirements::All => {
hts_rec.decode_all_aux()?;
}
}
Ok(())
}
fn fetch_htslib_region(
reader: &mut rust_htslib::bam::IndexedReader,
region: &Region,
) -> Result<()> {
use noodles::core::region::Interval;
use rust_htslib::bam::FetchDefinition;
let name = region.name();
let interval: Interval = region.interval();
let start: i64 = interval
.start()
.map_or(0, |p| i64::try_from(usize::from(p) - 1).expect("genomic position fits in i64"));
let stop: i64 = interval
.end()
.map_or(i64::MAX, |p| i64::try_from(usize::from(p)).expect("genomic position fits in i64"));
reader
.fetch(FetchDefinition::RegionString(name, start, stop))
.with_context(|| format!("Failed to fetch CRAM region: {region}"))
}
fn parse_htslib_header(header_bytes: &[u8]) -> Result<Header> {
if header_bytes.is_empty() {
bail!("CRAM file has an empty SAM header — refusing to read records against it");
}
let mut parser = sam::io::Reader::new(Cursor::new(header_bytes));
parser.read_header().context("Failed to parse SAM header text from CRAM")
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AlignmentFormat {
Sam,
GzippedSam,
Bam,
Cram,
}
fn detect_format(path: &Path) -> Result<AlignmentFormat> {
let ext = path.extension().and_then(|e| e.to_str());
if ext.is_some_and(|e| e.eq_ignore_ascii_case("gz")) {
let stem = path.file_stem().unwrap_or_default();
if Path::new(stem).extension().is_some_and(|e| e.eq_ignore_ascii_case("sam")) {
return Ok(AlignmentFormat::GzippedSam);
}
}
match ext {
Some(e) if e.eq_ignore_ascii_case("sam") => Ok(AlignmentFormat::Sam),
Some(e) if e.eq_ignore_ascii_case("bam") => Ok(AlignmentFormat::Bam),
Some(e) if e.eq_ignore_ascii_case("cram") => Ok(AlignmentFormat::Cram),
_ => bail!(
"Cannot determine alignment format from extension. \
Expected .sam, .sam.gz, .bam, or .cram: {}",
path.display()
),
}
}
fn append_extension(path: &Path, suffix: &str) -> PathBuf {
let mut p = path.as_os_str().to_owned();
p.push(suffix);
PathBuf::from(p)
}
fn validate_bam_index_exists(path: &Path) -> Result<()> {
let bai_sibling = path.with_extension("bai");
let bai_suffix = append_extension(path, ".bai");
let csi = append_extension(path, ".csi");
if bai_sibling.exists() || bai_suffix.exists() || csi.exists() {
return Ok(());
}
bail!(
"BAM index not found. Expected one of:\n {}\n {}\n {}\n\
Run `samtools index {}` to create one.",
bai_suffix.display(),
bai_sibling.display(),
csi.display(),
path.display(),
);
}
fn validate_cram_index_exists(path: &Path) -> Result<()> {
let crai_sibling = path.with_extension("crai");
let crai_suffix = append_extension(path, ".crai");
if crai_suffix.exists() || crai_sibling.exists() {
return Ok(());
}
bail!(
"CRAM index not found. Expected one of:\n {}\n {}\n\
Run `samtools index {}` to create one.",
crai_suffix.display(),
crai_sibling.display(),
path.display(),
);
}
fn open_context(path: &Path) -> String {
format!("Failed to open alignment file: {}", path.display())
}
fn header_context(path: &Path) -> String {
format!("Failed to read header from: {}", path.display())
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
fn touch(path: &Path) {
fs::write(path, b"").expect("create empty file");
}
#[test]
fn validate_bam_index_finds_suffix_form() {
let dir = TempDir::new().unwrap();
let bam = dir.path().join("sample.bam");
touch(&bam);
touch(&dir.path().join("sample.bam.bai"));
validate_bam_index_exists(&bam).expect("should accept suffix-form .bam.bai");
}
#[test]
fn validate_bam_index_finds_sibling_form() {
let dir = TempDir::new().unwrap();
let bam = dir.path().join("sample.bam");
touch(&bam);
touch(&dir.path().join("sample.bai"));
validate_bam_index_exists(&bam).expect("should accept sibling-form .bai");
}
#[test]
fn validate_bam_index_finds_csi() {
let dir = TempDir::new().unwrap();
let bam = dir.path().join("sample.bam");
touch(&bam);
touch(&dir.path().join("sample.bam.csi"));
validate_bam_index_exists(&bam).expect("should accept .csi");
}
#[test]
fn validate_bam_index_missing_errors() {
let dir = TempDir::new().unwrap();
let bam = dir.path().join("sample.bam");
touch(&bam);
let err = validate_bam_index_exists(&bam).expect_err("missing index should fail");
let msg = err.to_string();
assert!(msg.contains("BAM index not found"), "unexpected error: {msg}");
assert!(msg.contains("sample.bam.bai"), "should mention suffix form: {msg}");
assert!(msg.contains("sample.bai"), "should mention sibling form: {msg}");
}
#[test]
fn validate_cram_index_finds_crai() {
let dir = TempDir::new().unwrap();
let cram = dir.path().join("sample.cram");
touch(&cram);
touch(&dir.path().join("sample.cram.crai"));
validate_cram_index_exists(&cram).expect("should accept .cram.crai");
}
#[test]
fn validate_cram_index_missing_errors() {
let dir = TempDir::new().unwrap();
let cram = dir.path().join("sample.cram");
touch(&cram);
let err = validate_cram_index_exists(&cram).expect_err("missing .crai should fail");
assert!(err.to_string().contains("CRAM index not found"));
}
}