use crate::bgzf_reader::{decompress_block_into, read_raw_blocks};
use libdeflater::Decompressor;
use std::io::{self, BufReader, Read};
const BLOCKS_PER_BATCH: usize = 64;
use fgumi_raw_bam::{BAM_MAGIC, RawRecord};
pub struct RawBamRecordReader<R: Read> {
reader: BufReader<R>,
decompressor: Decompressor,
decompressed: Vec<u8>,
position: usize,
eof: bool,
header_skipped: bool,
}
impl<R: Read> RawBamRecordReader<R> {
pub fn new(reader: R) -> io::Result<Self> {
let reader = BufReader::with_capacity(256 * 1024, reader);
let mut this = Self {
reader,
decompressor: Decompressor::new(),
decompressed: Vec::with_capacity(64 * 65536), position: 0,
eof: false,
header_skipped: false,
};
this.refill_buffer()?;
if this.decompressed.len() < 4 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"File too small to contain BAM magic",
));
}
if &this.decompressed[0..4] != BAM_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Not a BAM file: expected magic {:?}, got {:?}",
BAM_MAGIC,
&this.decompressed[0..4]
),
));
}
this.position = 4;
Ok(this)
}
pub fn from_buf_reader(reader: BufReader<R>) -> io::Result<Self> {
let mut this = Self {
reader,
decompressor: Decompressor::new(),
decompressed: Vec::with_capacity(64 * 65536),
position: 0,
eof: false,
header_skipped: false,
};
this.refill_buffer()?;
if this.decompressed.len() < 4 || &this.decompressed[0..4] != BAM_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Not a BAM file"));
}
this.position = 4;
Ok(this)
}
#[allow(clippy::cast_possible_truncation)]
pub fn skip_header(&mut self) -> io::Result<Vec<u8>> {
if self.header_skipped {
return Err(io::Error::other("Header already skipped"));
}
self.ensure_data()?;
let mut header_bytes = Vec::new();
let l_text = self.read_u32()? as usize;
header_bytes.extend_from_slice(&(l_text as u32).to_le_bytes());
if !self.ensure_bytes(l_text)? {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Truncated BAM header (text)",
));
}
header_bytes.extend_from_slice(&self.decompressed[self.position..self.position + l_text]);
self.position += l_text;
let n_ref = self.read_u32()? as usize;
header_bytes.extend_from_slice(&(n_ref as u32).to_le_bytes());
for _ in 0..n_ref {
let l_name = self.read_u32()? as usize;
header_bytes.extend_from_slice(&(l_name as u32).to_le_bytes());
if !self.ensure_bytes(l_name)? {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Truncated BAM header (ref name)",
));
}
header_bytes
.extend_from_slice(&self.decompressed[self.position..self.position + l_name]);
self.position += l_name;
let l_ref = self.read_u32()?;
header_bytes.extend_from_slice(&l_ref.to_le_bytes());
}
self.header_skipped = true;
Ok(header_bytes)
}
pub fn next_record(&mut self) -> io::Result<Option<RawRecord>> {
if !self.header_skipped {
return Err(io::Error::other("Must call skip_header() first"));
}
if !self.ensure_bytes(4)? {
return Ok(None); }
let block_size = u32::from_le_bytes([
self.decompressed[self.position],
self.decompressed[self.position + 1],
self.decompressed[self.position + 2],
self.decompressed[self.position + 3],
]) as usize;
if block_size < 32 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid BAM block_size: {block_size}"),
));
}
let total_size = 4 + block_size;
if !self.ensure_bytes(total_size)? {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Truncated BAM record"));
}
let record = RawRecord::from(
self.decompressed[self.position + 4..self.position + total_size].to_vec(),
);
self.position += total_size;
Ok(Some(record))
}
fn read_u32(&mut self) -> io::Result<u32> {
if !self.ensure_bytes(4)? {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Unexpected EOF while reading u32",
));
}
let val = u32::from_le_bytes([
self.decompressed[self.position],
self.decompressed[self.position + 1],
self.decompressed[self.position + 2],
self.decompressed[self.position + 3],
]);
self.position += 4;
Ok(val)
}
fn ensure_bytes(&mut self, n: usize) -> io::Result<bool> {
while self.position + n > self.decompressed.len() {
if self.eof {
return Ok(false);
}
self.refill_buffer()?;
}
Ok(true)
}
fn ensure_data(&mut self) -> io::Result<()> {
if self.position >= self.decompressed.len() && !self.eof {
self.refill_buffer()?;
}
Ok(())
}
fn refill_buffer(&mut self) -> io::Result<()> {
if self.position > 0 {
let remaining = self.decompressed.len() - self.position;
if remaining > 0 {
self.decompressed.copy_within(self.position.., 0);
}
self.decompressed.truncate(remaining);
self.position = 0;
}
let blocks = read_raw_blocks(&mut self.reader, BLOCKS_PER_BATCH)?;
if blocks.is_empty() {
self.eof = true;
return Ok(());
}
for block in &blocks {
decompress_block_into(block, &mut self.decompressor, &mut self.decompressed)?;
}
Ok(())
}
}
impl<R: Read> Iterator for RawBamRecordReader<R> {
type Item = io::Result<RawRecord>;
fn next(&mut self) -> Option<Self::Item> {
match self.next_record() {
Ok(Some(record)) => Some(Ok(record)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
pub struct BatchedRawBamReader<R: Read> {
inner: RawBamRecordReader<R>,
batch_size: usize,
}
impl<R: Read> BatchedRawBamReader<R> {
pub fn new(reader: R, batch_size: usize) -> io::Result<Self> {
Ok(Self { inner: RawBamRecordReader::new(reader)?, batch_size })
}
pub fn skip_header(&mut self) -> io::Result<Vec<u8>> {
self.inner.skip_header()
}
pub fn next_batch(&mut self) -> io::Result<Option<Vec<RawRecord>>> {
let mut batch = Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
match self.inner.next_record()? {
Some(record) => batch.push(record),
None => break,
}
}
if batch.is_empty() { Ok(None) } else { Ok(Some(batch)) }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[allow(clippy::cast_possible_truncation)]
fn build_test_bam(header_text: &str, refs: &[(&str, u32)], records: &[Vec<u8>]) -> Vec<u8> {
let mut raw_bam = Vec::new();
raw_bam.extend_from_slice(BAM_MAGIC);
let text_bytes = header_text.as_bytes();
raw_bam.extend_from_slice(&(text_bytes.len() as u32).to_le_bytes());
raw_bam.extend_from_slice(text_bytes);
raw_bam.extend_from_slice(&(refs.len() as u32).to_le_bytes());
for (name, length) in refs {
let name_with_null = format!("{name}\0");
raw_bam.extend_from_slice(&(name_with_null.len() as u32).to_le_bytes());
raw_bam.extend_from_slice(name_with_null.as_bytes());
raw_bam.extend_from_slice(&length.to_le_bytes());
}
for record in records {
raw_bam.extend_from_slice(&(record.len() as u32).to_le_bytes());
raw_bam.extend_from_slice(record);
}
let mut compressed = Vec::new();
{
let mut writer = noodles_bgzf::io::Writer::new(&mut compressed);
writer.write_all(&raw_bam).expect("failed to write raw BAM data to BGZF");
writer.finish().expect("failed to finish BGZF writer");
}
compressed
}
#[allow(clippy::cast_possible_truncation)]
fn make_minimal_record(name: &[u8]) -> Vec<u8> {
let l_read_name = (name.len() + 1) as u8; let total = 32 + l_read_name as usize;
let mut rec = vec![0u8; total];
rec[0..4].copy_from_slice(&(-1i32).to_le_bytes());
rec[4..8].copy_from_slice(&(-1i32).to_le_bytes());
rec[8] = l_read_name;
rec[32..32 + name.len()].copy_from_slice(name);
rec
}
#[test]
fn test_blocks_per_batch() {
assert_eq!(BLOCKS_PER_BATCH, 64);
}
#[test]
fn test_new_valid_bam() {
let data = build_test_bam("@HD\tVN:1.6\n", &[], &[]);
let reader = RawBamRecordReader::new(io::Cursor::new(data));
assert!(reader.is_ok(), "Expected valid BAM to succeed: {:?}", reader.err());
}
#[test]
fn test_new_invalid_magic() {
let mut compressed = Vec::new();
{
let mut writer = noodles_bgzf::io::Writer::new(&mut compressed);
writer.write_all(b"NOT_BAM!").expect("failed to write non-BAM data to BGZF");
writer.finish().expect("failed to finish BGZF writer for invalid magic test");
}
let result = RawBamRecordReader::new(io::Cursor::new(compressed));
let Err(err) = result else { unreachable!("Expected error for invalid magic") };
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(err.to_string().contains("Not a BAM file"), "Unexpected error message: {err}");
}
#[test]
fn test_new_empty_input() {
let mut compressed = Vec::new();
{
let writer = noodles_bgzf::io::Writer::new(&mut compressed);
writer.finish().expect("failed to finish BGZF writer for empty input test");
}
let result = RawBamRecordReader::new(io::Cursor::new(compressed));
let Err(err) = result else { unreachable!("Expected error for empty input") };
assert!(
err.to_string().contains("File too small")
|| err.to_string().contains("Not a BAM file"),
"Unexpected error message: {err}"
);
}
#[test]
fn test_skip_header_returns_bytes() {
let header_text = "@HD\tVN:1.6\n";
let refs = vec![("chr1", 1000u32), ("chr2", 2000u32)];
let data = build_test_bam(header_text, &refs, &[]);
let mut reader = RawBamRecordReader::new(io::Cursor::new(data))
.expect("failed to create reader for valid BAM");
let header_bytes = reader.skip_header().expect("failed to skip BAM header");
assert!(!header_bytes.is_empty());
let l_text = u32::from_le_bytes(
header_bytes
.get(0..4)
.expect("header_bytes too short for l_text")
.try_into()
.expect("l_text slice must be exactly 4 bytes"),
) as usize;
assert_eq!(l_text, header_text.len());
let parsed_text = &header_bytes[4..4 + l_text];
assert_eq!(parsed_text, header_text.as_bytes());
let offset = 4 + l_text;
let n_ref = u32::from_le_bytes(
header_bytes
.get(offset..offset + 4)
.expect("header_bytes too short for n_ref")
.try_into()
.expect("n_ref slice must be exactly 4 bytes"),
);
assert_eq!(n_ref, 2);
}
#[test]
fn test_skip_header_twice_errors() {
let data = build_test_bam("@HD\tVN:1.6\n", &[], &[]);
let mut reader = RawBamRecordReader::new(io::Cursor::new(data))
.expect("failed to create reader for skip_header test");
reader.skip_header().expect("first skip_header should succeed");
let result = reader.skip_header();
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains("already skipped"),
"Expected 'already skipped' error"
);
}
#[test]
fn test_next_record_without_skip_header_errors() {
let data = build_test_bam("@HD\tVN:1.6\n", &[], &[]);
let mut reader = RawBamRecordReader::new(io::Cursor::new(data))
.expect("failed to create reader for next_record test");
let result = reader.next_record();
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains("skip_header"),
"Expected error about skip_header"
);
}
#[test]
fn test_read_single_record() {
let rec = make_minimal_record(b"R");
let data = build_test_bam("@HD\tVN:1.6\n", &[], std::slice::from_ref(&rec));
let mut reader = RawBamRecordReader::new(io::Cursor::new(data))
.expect("failed to create reader for single record test");
reader.skip_header().expect("failed to skip header");
let record = reader.next_record().expect("failed to read first record");
assert!(record.is_some(), "Expected one record");
let record = record.expect("record should be Some");
assert_eq!(record.as_ref(), rec.as_slice(), "Record bytes should match");
let eof = reader.next_record().expect("failed to read at EOF");
assert!(eof.is_none(), "Expected EOF after single record");
}
#[test]
fn test_read_multiple_records() {
let rec_a = make_minimal_record(b"A");
let rec_b = make_minimal_record(b"B");
let rec_c = make_minimal_record(b"C");
let data =
build_test_bam("@HD\tVN:1.6\n", &[], &[rec_a.clone(), rec_b.clone(), rec_c.clone()]);
let mut reader = RawBamRecordReader::new(io::Cursor::new(data))
.expect("failed to create reader for multiple records test");
reader.skip_header().expect("failed to skip header");
let r1 = reader.next_record().expect("failed to read record 1").expect("record 1");
let r2 = reader.next_record().expect("failed to read record 2").expect("record 2");
let r3 = reader.next_record().expect("failed to read record 3").expect("record 3");
assert_eq!(r1.as_ref(), rec_a.as_slice());
assert_eq!(r2.as_ref(), rec_b.as_slice());
assert_eq!(r3.as_ref(), rec_c.as_slice());
assert!(reader.next_record().expect("failed to read at EOF").is_none(), "Expected EOF");
}
#[test]
fn test_iterator_adapter() {
let rec_a = make_minimal_record(b"X");
let rec_b = make_minimal_record(b"Y");
let data = build_test_bam("@HD\tVN:1.6\n", &[], &[rec_a.clone(), rec_b.clone()]);
let mut reader = RawBamRecordReader::new(io::Cursor::new(data))
.expect("failed to create reader for iterator test");
reader.skip_header().expect("failed to skip header");
let records: Vec<RawRecord> =
reader.map(|r| r.expect("failed to read record via iterator")).collect();
assert_eq!(records.len(), 2);
assert_eq!(records[0].as_ref(), rec_a.as_slice());
assert_eq!(records[1].as_ref(), rec_b.as_slice());
}
#[test]
fn test_batched_reader() {
let rec_a = make_minimal_record(b"A");
let rec_b = make_minimal_record(b"B");
let rec_c = make_minimal_record(b"C");
let data =
build_test_bam("@HD\tVN:1.6\n", &[], &[rec_a.clone(), rec_b.clone(), rec_c.clone()]);
let mut reader = BatchedRawBamReader::new(io::Cursor::new(data), 2)
.expect("failed to create batched reader");
reader.skip_header().expect("failed to skip header");
let batch1 = reader.next_batch().expect("failed to read batch 1").expect("batch 1");
assert_eq!(batch1.len(), 2);
assert_eq!(batch1[0].as_ref(), rec_a.as_slice());
assert_eq!(batch1[1].as_ref(), rec_b.as_slice());
let batch2 = reader.next_batch().expect("failed to read batch 2").expect("batch 2");
assert_eq!(batch2.len(), 1);
assert_eq!(batch2[0].as_ref(), rec_c.as_slice());
assert!(
reader.next_batch().expect("failed to read at end of batches").is_none(),
"Expected no more batches"
);
}
#[test]
fn test_from_buf_reader() {
let data = build_test_bam("@HD\tVN:1.6\n", &[("chr1", 500)], &[]);
let buf_reader = BufReader::new(io::Cursor::new(data));
let mut reader = RawBamRecordReader::from_buf_reader(buf_reader)
.expect("failed to create reader from BufReader");
let header_bytes = reader.skip_header().expect("failed to skip header");
assert!(!header_bytes.is_empty());
assert!(reader.next_record().expect("failed to read at EOF").is_none());
}
#[test]
fn test_batched_reader_eof() {
let data = build_test_bam("@HD\tVN:1.6\n", &[], &[]);
let mut reader = BatchedRawBamReader::new(io::Cursor::new(data), 10)
.expect("failed to create batched reader for empty BAM");
reader.skip_header().expect("failed to skip header");
assert!(
reader.next_batch().expect("failed to read first batch").is_none(),
"Expected None for empty BAM"
);
assert!(
reader.next_batch().expect("failed to read second batch").is_none(),
"Expected None on repeated call"
);
}
}