use anyhow::{Context, Result};
use noodles::bam::bai;
use noodles::bgzf::VirtualPosition;
use noodles::core::Position;
use noodles::sam::Header;
use noodles_bgzf::io::{MultithreadedReader, Reader as BgzfReader, Writer as BgzfWriter};
use crate::vendored::{BlockInfoRx, MultithreadedWriter, MultithreadedWriterBuilder};
use fgumi_raw_bam::RawBamReader;
use bgzf::CompressionLevel;
use noodles_csi::binning_index::Indexer;
use noodles_csi::binning_index::index::reference_sequence::bin::Chunk;
use noodles_csi::binning_index::index::reference_sequence::index::LinearIndex;
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead, Read, Write};
use std::num::NonZero;
use std::path::Path;
const MAX_BLOCK_SIZE: usize = 65280;
pub enum BgzfReaderEnum {
SingleThreaded(BgzfReader<Box<dyn Read + Send>>),
MultiThreaded(MultithreadedReader<Box<dyn Read + Send>>),
}
impl Read for BgzfReaderEnum {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
BgzfReaderEnum::SingleThreaded(r) => r.read(buf),
BgzfReaderEnum::MultiThreaded(r) => r.read(buf),
}
}
}
impl BufRead for BgzfReaderEnum {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
match self {
BgzfReaderEnum::SingleThreaded(r) => r.fill_buf(),
BgzfReaderEnum::MultiThreaded(r) => r.fill_buf(),
}
}
fn consume(&mut self, amt: usize) {
match self {
BgzfReaderEnum::SingleThreaded(r) => r.consume(amt),
BgzfReaderEnum::MultiThreaded(r) => r.consume(amt),
}
}
}
pub type BamReaderAuto = noodles::bam::io::Reader<BgzfReaderEnum>;
pub enum BgzfWriterEnum {
SingleThreaded(BgzfWriter<Box<dyn Write + Send>>),
MultiThreaded(MultithreadedWriter<Box<dyn Write + Send>>),
}
impl Write for BgzfWriterEnum {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
BgzfWriterEnum::SingleThreaded(w) => w.write(buf),
BgzfWriterEnum::MultiThreaded(w) => w.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
BgzfWriterEnum::SingleThreaded(w) => w.flush(),
BgzfWriterEnum::MultiThreaded(w) => w.flush(),
}
}
}
impl BgzfWriterEnum {
pub fn finish(self) -> io::Result<()> {
match self {
BgzfWriterEnum::SingleThreaded(mut w) => {
w.flush()?;
Ok(())
}
BgzfWriterEnum::MultiThreaded(mut w) => {
w.finish().map_err(|e| io::Error::other(e.to_string()))?;
Ok(())
}
}
}
}
pub type BamWriter = noodles::bam::io::Writer<BgzfWriterEnum>;
fn make_bgzf_reader(reader: Box<dyn Read + Send>, threads: usize) -> BgzfReaderEnum {
if threads > 1 {
let worker_count = NonZero::new(threads).expect("threads > 1 checked above");
BgzfReaderEnum::MultiThreaded(MultithreadedReader::with_worker_count(worker_count, reader))
} else {
BgzfReaderEnum::SingleThreaded(BgzfReader::new(reader))
}
}
#[allow(clippy::cast_possible_truncation)]
fn make_bgzf_writer(
output: Box<dyn Write + Send>,
threads: usize,
compression_level: u32,
) -> BgzfWriterEnum {
if threads > 1 {
let worker_count = NonZero::new(threads).expect("threads > 1 checked above");
let mut builder = MultithreadedWriterBuilder::default().set_worker_count(worker_count);
if let Ok(cl) = CompressionLevel::new(compression_level as u8) {
builder = builder.set_compression_level(cl);
}
BgzfWriterEnum::MultiThreaded(builder.build_from_writer(output))
} else {
let level = noodles_bgzf::io::writer::CompressionLevel::new(compression_level as u8)
.unwrap_or_default();
let writer = noodles_bgzf::io::writer::Builder::default()
.set_compression_level(level)
.build_from_writer(output);
BgzfWriterEnum::SingleThreaded(writer)
}
}
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
pub(crate) fn write_bam_header(writer: &mut impl Write, header: &Header) -> io::Result<()> {
writer.write_all(fgumi_raw_bam::BAM_MAGIC)?;
let mut sam_writer = noodles::sam::io::Writer::new(Vec::new());
sam_writer.write_header(header)?;
let header_bytes = sam_writer.into_inner();
let l_text = header_bytes.len() as i32;
writer.write_all(&l_text.to_le_bytes())?;
writer.write_all(&header_bytes)?;
let n_ref = header.reference_sequences().len() as i32;
writer.write_all(&n_ref.to_le_bytes())?;
for (name, map) in header.reference_sequences() {
let l_name = (name.len() + 1) as u32;
writer.write_all(&l_name.to_le_bytes())?;
writer.write_all(name)?;
writer.write_all(&[0u8])?;
let l_ref = map.length().get() as i32;
writer.write_all(&l_ref.to_le_bytes())?;
}
Ok(())
}
pub struct RawBamWriter {
inner: BgzfWriterEnum,
}
impl RawBamWriter {
#[must_use]
pub fn new(inner: BgzfWriterEnum) -> Self {
Self { inner }
}
pub fn write_header(&mut self, header: &Header) -> io::Result<()> {
write_bam_header(&mut self.inner, header)
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn write_raw_record(&mut self, record_bytes: &[u8]) -> io::Result<()> {
use std::io::Write;
let block_size = record_bytes.len() as u32;
self.inner.write_all(&block_size.to_le_bytes())?;
self.inner.write_all(record_bytes)
}
pub fn write_raw_bytes(&mut self, data: &[u8]) -> io::Result<()> {
use std::io::Write;
self.inner.write_all(data)
}
pub fn finish(self) -> io::Result<()> {
self.inner.finish()
}
}
pub fn create_raw_bam_writer<P: AsRef<Path>>(
path: P,
header: &Header,
threads: usize,
compression_level: u32,
) -> Result<RawBamWriter> {
let path_ref = path.as_ref();
let output = open_output_writer(path_ref)?;
let bgzf_writer = make_bgzf_writer(output, threads, compression_level);
let mut writer = RawBamWriter::new(bgzf_writer);
writer
.write_header(header)
.with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
Ok(writer)
}
#[derive(Debug)]
struct CachedIndexEntry {
block_number: u64,
offset_in_block: usize,
record_len: usize,
alignment_context: Option<(usize, Position, Position, bool)>,
}
pub struct IndexingBamWriter {
inner: MultithreadedWriter<File>,
block_info_rx: BlockInfoRx,
indexer: Indexer<LinearIndex>,
num_refs: usize,
entry_cache: Vec<CachedIndexEntry>,
block_positions: HashMap<u64, u64>,
current_block_number: u64,
current_block_offset: usize,
}
impl IndexingBamWriter {
fn new(inner: MultithreadedWriter<File>, num_refs: usize) -> Self {
let block_info_rx = inner
.block_info_receiver()
.expect("block_info_receiver must be available for IndexingBamWriter")
.clone();
Self {
current_block_number: inner.current_block_number(),
current_block_offset: inner.buffer_offset(),
inner,
block_info_rx,
indexer: Indexer::default(),
num_refs,
entry_cache: Vec::new(),
block_positions: HashMap::new(),
}
}
fn write_header(&mut self, header: &Header) -> io::Result<()> {
write_bam_header(&mut self.inner, header)?;
self.inner.flush()?;
self.current_block_number = self.inner.current_block_number();
self.current_block_offset = self.inner.buffer_offset();
Ok(())
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn write_raw_record(&mut self, record_bytes: &[u8]) -> io::Result<()> {
let block_number = self.current_block_number;
let offset_in_block = self.current_block_offset;
let block_size = record_bytes.len() as u32;
self.inner.write_all(&block_size.to_le_bytes())?;
self.inner.write_all(record_bytes)?;
let record_len = 4 + record_bytes.len();
self.current_block_offset += record_len;
let new_block_number = self.inner.current_block_number();
if new_block_number > self.current_block_number {
self.current_block_number = new_block_number;
self.current_block_offset = self.inner.buffer_offset();
}
let alignment_context = Self::extract_alignment_context(record_bytes);
self.entry_cache.push(CachedIndexEntry {
block_number,
offset_in_block,
record_len,
alignment_context,
});
self.flush_completed_blocks()?;
Ok(())
}
#[allow(clippy::cast_possible_truncation)]
fn flush_completed_blocks(&mut self) -> io::Result<()> {
while let Ok(info) = self.block_info_rx.try_recv() {
self.block_positions.insert(info.block_number, info.compressed_start);
}
let mut i = 0;
while i < self.entry_cache.len() {
let entry = &self.entry_cache[i];
if let Some(&block_start) = self.block_positions.get(&entry.block_number) {
let start_vpos =
VirtualPosition::try_from((block_start, entry.offset_in_block as u16))
.unwrap_or(VirtualPosition::MIN);
let end_offset = entry.offset_in_block + entry.record_len;
let end_vpos = if end_offset <= MAX_BLOCK_SIZE {
VirtualPosition::try_from((block_start, end_offset as u16))
.unwrap_or(VirtualPosition::MIN)
} else {
let next_block = entry.block_number + 1;
if let Some(&next_start) = self.block_positions.get(&next_block) {
let overflow = end_offset - MAX_BLOCK_SIZE;
VirtualPosition::try_from((next_start, overflow as u16))
.unwrap_or(VirtualPosition::MIN)
} else {
i += 1;
continue;
}
};
let chunk = Chunk::new(start_vpos, end_vpos);
if let Some(ctx) = entry.alignment_context {
self.indexer.add_record(Some(ctx), chunk).map_err(io::Error::other)?;
} else {
self.indexer.add_record(None, chunk).map_err(io::Error::other)?;
}
self.entry_cache.remove(i);
} else {
i += 1;
}
}
Ok(())
}
#[inline]
#[allow(clippy::cast_sign_loss, clippy::cast_possible_wrap)]
fn extract_alignment_context(bam: &[u8]) -> Option<(usize, Position, Position, bool)> {
let v = fgumi_raw_bam::RawRecordView::new(bam);
let tid = v.ref_id();
let pos = v.pos();
let flags = v.flags();
let is_unmapped = (flags & fgumi_raw_bam::flags::UNMAPPED) != 0;
if tid < 0 || is_unmapped {
return None;
}
let ref_len = fgumi_raw_bam::reference_length_from_raw_bam(bam);
let start = Position::try_from((pos + 1) as usize).ok()?;
let end = Position::try_from((pos + ref_len) as usize).ok()?;
Some((tid as usize, start, end, true))
}
pub fn finish(mut self) -> io::Result<bai::Index> {
self.inner.flush()?;
for _ in 0..100 {
self.flush_completed_blocks()?;
if self.entry_cache.is_empty() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let _ = self.inner.finish().map_err(|e| io::Error::other(e.to_string()))?;
self.flush_completed_blocks()?;
if !self.entry_cache.is_empty() {
return Err(io::Error::other(format!(
"Unflushed index entries remain: {} entries",
self.entry_cache.len()
)));
}
let index = self.indexer.build(self.num_refs);
Ok(index)
}
}
pub fn create_indexing_bam_writer<P: AsRef<Path>>(
path: P,
header: &Header,
compression_level: u32,
threads: usize,
) -> Result<IndexingBamWriter> {
let path_ref = path.as_ref();
if is_stdout_path(path_ref) {
anyhow::bail!(
"Cannot create an indexing BAM writer for stdout (indexing requires a seekable file)"
);
}
let output_file = File::create(path_ref)
.with_context(|| format!("Failed to create output BAM: {}", path_ref.display()))?;
let worker_count = NonZero::new(threads.max(1)).expect("threads.max(1) >= 1");
let mut builder = MultithreadedWriterBuilder::default().set_worker_count(worker_count);
#[allow(clippy::cast_possible_truncation)]
if let Ok(cl) = CompressionLevel::new(compression_level as u8) {
builder = builder.set_compression_level(cl);
}
let bgzf_writer = builder.build_from_writer(output_file);
let num_refs = header.reference_sequences().len();
let mut writer = IndexingBamWriter::new(bgzf_writer, num_refs);
writer
.write_header(header)
.with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
Ok(writer)
}
pub fn write_bai_index<P: AsRef<Path>>(path: P, index: &bai::Index) -> Result<()> {
let path_ref = path.as_ref();
let file = File::create(path_ref)
.with_context(|| format!("Failed to create index file: {}", path_ref.display()))?;
let mut writer = bai::io::Writer::new(file);
writer
.write_index(index)
.with_context(|| format!("Failed to write index to: {}", path_ref.display()))?;
Ok(())
}
pub fn create_bam_reader<P: AsRef<Path>>(
path: P,
threads: usize,
) -> Result<(BamReaderAuto, Header)> {
create_bam_reader_with_opts(path, threads, PipelineReaderOpts::default())
}
pub fn create_bam_reader_with_opts<P: AsRef<Path>>(
path: P,
threads: usize,
opts: PipelineReaderOpts,
) -> Result<(BamReaderAuto, Header)> {
let path_ref = path.as_ref();
let file = File::open(path_ref)
.with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
crate::os_hints::advise_sequential(&file);
let reader: Box<dyn Read + Send> = if opts.async_reader {
log::info!(
"async BAM reader enabled: spawning fgumi-prefetch thread for {}",
path_ref.display()
);
Box::new(crate::prefetch_reader::PrefetchReader::from_file(file))
} else {
Box::new(file)
};
let bgzf_reader = make_bgzf_reader(reader, threads);
let mut reader = noodles::bam::io::Reader::from(bgzf_reader);
let header = reader
.read_header()
.with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
Ok((reader, header))
}
pub type RawBamReaderAuto = RawBamReader<BgzfReaderEnum>;
pub fn create_raw_bam_reader<P: AsRef<Path>>(
path: P,
threads: usize,
) -> Result<(RawBamReaderAuto, Header)> {
create_raw_bam_reader_with_opts(path, threads, PipelineReaderOpts::default())
}
pub fn create_raw_bam_reader_with_opts<P: AsRef<Path>>(
path: P,
threads: usize,
opts: PipelineReaderOpts,
) -> Result<(RawBamReaderAuto, Header)> {
let path_ref = path.as_ref();
let file = File::open(path_ref)
.with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
crate::os_hints::advise_sequential(&file);
let reader: Box<dyn Read + Send> = if opts.async_reader {
log::info!(
"async raw BAM reader enabled: spawning fgumi-prefetch thread for {}",
path_ref.display()
);
Box::new(crate::prefetch_reader::PrefetchReader::from_file(file))
} else {
Box::new(file)
};
let bgzf_reader = make_bgzf_reader(reader, threads);
let mut noodles_reader = noodles::bam::io::Reader::from(bgzf_reader);
let header = noodles_reader
.read_header()
.with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
let bgzf_reader = noodles_reader.into_inner();
let raw_reader = RawBamReader::new(bgzf_reader);
Ok((raw_reader, header))
}
pub fn create_raw_bam_reader_pool_integrated<P: AsRef<Path>>(
path: P,
pool: &std::sync::Arc<crate::sort::worker_pool::SortWorkerPool>,
async_reader: bool,
) -> Result<(RawBamReader<crate::sort::read_ahead::PooledInputStream>, Header)> {
use crate::sort::read_ahead::PooledInputStream;
use crate::sort::worker_pool::phase;
let path_ref = path.as_ref();
let (header, reader): (Header, Box<dyn io::Read + Send>) = if is_stdin_path(path_ref) {
let stdin = io::stdin();
let tee = TeeReader::new(stdin);
let bgzf = BgzfReader::new(tee);
let mut noodles_reader = noodles::bam::io::Reader::from(bgzf);
let header =
noodles_reader.read_header().with_context(|| "Failed to read BAM header from stdin")?;
let bgzf = noodles_reader.into_inner();
let tee = bgzf.into_inner();
let (buffered_bytes, stdin) = tee.into_parts();
let chained = ChainedReader::new(buffered_bytes, stdin);
(header, Box::new(io::BufReader::with_capacity(2 * 1024 * 1024, chained)))
} else {
use std::io::{Seek, SeekFrom};
let mut file = File::open(path_ref)
.with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
let header = {
let bgzf = BgzfReader::new(&mut file);
let mut noodles_reader = noodles::bam::io::Reader::from(bgzf);
noodles_reader
.read_header()
.with_context(|| format!("Failed to read header from: {}", path_ref.display()))?
};
file.seek(SeekFrom::Start(0))
.with_context(|| format!("Failed to rewind input BAM: {}", path_ref.display()))?;
let reader: Box<dyn io::Read + Send> = if async_reader {
crate::os_hints::advise_sequential(&file);
log::info!(
"async sort reader enabled: spawning fgumi-prefetch thread for {}",
path_ref.display()
);
Box::new(crate::prefetch_reader::PrefetchReader::from_file(file))
} else {
Box::new(io::BufReader::with_capacity(2 * 1024 * 1024, file))
};
(header, reader)
};
pool.set_input_file(reader);
pool.set_phase(phase::PHASE1);
let mut pooled_input = PooledInputStream::new(
pool.decompressed_input_queue(),
pool.decompressed_input_done_flag(),
pool.input_read_error_flag(),
pool.decompress_error_flag(),
);
skip_bam_header(&mut pooled_input)
.with_context(|| format!("Failed to skip header from: {}", path_ref.display()))?;
let raw_reader = RawBamReader::new(pooled_input);
Ok((raw_reader, header))
}
fn skip_bam_header<R: Read>(reader: &mut R) -> Result<()> {
let mut buf4 = [0u8; 4];
reader.read_exact(&mut buf4)?;
anyhow::ensure!(&buf4 == b"BAM\x01", "Not a BAM file (bad magic)");
reader.read_exact(&mut buf4)?;
let l_text = u32::from_le_bytes(buf4) as usize;
let copied = io::copy(&mut reader.take(l_text as u64), &mut io::sink())?;
anyhow::ensure!(
copied == l_text as u64,
"BAM header text truncated: expected {l_text} bytes, got {copied}"
);
reader.read_exact(&mut buf4)?;
let n_ref = u32::from_le_bytes(buf4) as usize;
for _ in 0..n_ref {
reader.read_exact(&mut buf4)?;
let l_name = u32::from_le_bytes(buf4) as usize;
let copied = io::copy(&mut reader.take(l_name as u64), &mut io::sink())?;
anyhow::ensure!(
copied == l_name as u64,
"BAM reference name truncated: expected {l_name} bytes, got {copied}"
);
reader.read_exact(&mut buf4)?; }
Ok(())
}
pub fn create_bam_writer<P: AsRef<Path>>(
path: P,
header: &Header,
threads: usize,
compression_level: u32,
) -> Result<BamWriter> {
let path_ref = path.as_ref();
let output = open_output_writer(path_ref)?;
let bgzf_writer = make_bgzf_writer(output, threads, compression_level);
let mut writer = noodles::bam::io::Writer::from(bgzf_writer);
writer
.write_header(header)
.with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
Ok(writer)
}
pub fn create_optional_bam_writer<P: AsRef<Path>>(
path: Option<P>,
header: &Header,
threads: usize,
compression_level: u32,
) -> Result<Option<BamWriter>> {
match path {
Some(p) => Ok(Some(create_bam_writer(p, header, threads, compression_level)?)),
None => Ok(None),
}
}
pub fn is_stdin_path<P: AsRef<Path>>(path: P) -> bool {
let path_str = path.as_ref().to_string_lossy();
path_str == "-" || path_str == "/dev/stdin"
}
pub fn is_stdout_path<P: AsRef<Path>>(path: P) -> bool {
let path_str = path.as_ref().to_string_lossy();
path_str == "-" || path_str == "/dev/stdout"
}
pub(crate) fn open_output_writer<P: AsRef<Path>>(path: P) -> Result<Box<dyn Write + Send>> {
let path_ref = path.as_ref();
if is_stdout_path(path_ref) {
Ok(Box::new(std::io::stdout()))
} else {
let file = File::create(path_ref)
.with_context(|| format!("Failed to create output BAM: {}", path_ref.display()))?;
Ok(Box::new(file))
}
}
struct TeeReader<R> {
inner: R,
buffer: Vec<u8>,
}
impl<R: Read> TeeReader<R> {
fn new(inner: R) -> Self {
Self { inner, buffer: Vec::new() }
}
fn into_parts(self) -> (Vec<u8>, R) {
(self.buffer, self.inner)
}
}
impl<R: Read> Read for TeeReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?;
self.buffer.extend_from_slice(&buf[..n]);
Ok(n)
}
}
pub(crate) struct ChainedReader<R> {
buffer: io::Cursor<Vec<u8>>,
inner: R,
buffer_exhausted: bool,
}
impl<R: Read> ChainedReader<R> {
pub(crate) fn new(buffer: Vec<u8>, inner: R) -> Self {
Self { buffer: io::Cursor::new(buffer), inner, buffer_exhausted: false }
}
}
impl<R: Read> Read for ChainedReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if !self.buffer_exhausted {
let n = self.buffer.read(buf)?;
if n > 0 {
return Ok(n);
}
self.buffer_exhausted = true;
}
self.inner.read(buf)
}
}
pub fn create_bam_reader_for_pipeline<P: AsRef<Path>>(
path: P,
) -> Result<(Box<dyn Read + Send>, Header)> {
create_bam_reader_for_pipeline_with_opts(path, PipelineReaderOpts::default())
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PipelineReaderOpts {
pub async_reader: bool,
}
pub fn create_bam_reader_for_pipeline_with_opts<P: AsRef<Path>>(
path: P,
opts: PipelineReaderOpts,
) -> Result<(Box<dyn Read + Send>, Header)> {
use std::io::{Seek, SeekFrom};
let path_ref = path.as_ref();
if is_stdin_path(path_ref) {
let stdin = io::stdin();
let tee_reader = TeeReader::new(stdin);
let bgzf_reader = BgzfReader::new(tee_reader);
let mut bam_reader = noodles::bam::io::Reader::from(bgzf_reader);
let header =
bam_reader.read_header().with_context(|| "Failed to read header from stdin")?;
let bgzf_reader = bam_reader.into_inner();
let tee_reader = bgzf_reader.into_inner();
let (buffered_bytes, stdin) = tee_reader.into_parts();
let chained = ChainedReader::new(buffered_bytes, stdin);
if opts.async_reader {
log::info!("async BAM reader enabled: spawning fgumi-prefetch thread for stdin");
let prefetch = crate::prefetch_reader::PrefetchReader::new(chained);
Ok((Box::new(prefetch), header))
} else {
Ok((Box::new(chained), header))
}
} else {
let mut file = File::open(path_ref)
.with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
crate::os_hints::advise_sequential(&file);
let bgzf_reader = BgzfReader::new(&file);
let mut bam_reader = noodles::bam::io::Reader::from(bgzf_reader);
let header = bam_reader
.read_header()
.with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
file.seek(SeekFrom::Start(0))
.with_context(|| format!("Failed to seek in file: {}", path_ref.display()))?;
if opts.async_reader {
log::info!(
"async BAM reader enabled: spawning fgumi-prefetch thread for {}",
path_ref.display()
);
let prefetch = crate::prefetch_reader::PrefetchReader::from_file(file);
Ok((Box::new(prefetch), header))
} else {
Ok((Box::new(file), header))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use noodles::sam::header::record::value::{Map, map::ReferenceSequence};
use std::num::NonZeroUsize;
use tempfile::NamedTempFile;
fn create_test_header() -> Header {
let mut builder = Header::builder();
let ref_seq = Map::<ReferenceSequence>::new(
NonZeroUsize::new(100).expect("100 is non-zero constant"),
);
builder = builder.add_reference_sequence(b"chr1", ref_seq);
builder.build()
}
#[test]
fn test_create_bam_reader_nonexistent_file() {
let result = create_bam_reader("/nonexistent/file.bam", 1);
assert!(result.is_err());
if let Err(e) = result {
let err_msg = e.to_string();
assert!(err_msg.contains("Failed to open input BAM"));
}
}
#[test]
fn test_create_bam_writer() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let writer = create_bam_writer(temp_file.path(), &header, 1, 6);
assert!(writer.is_ok());
Ok(())
}
#[test]
fn test_create_bam_writer_multithreaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let writer = create_bam_writer(temp_file.path(), &header, 4, 6);
assert!(writer.is_ok());
Ok(())
}
#[test]
fn test_create_bam_writer_invalid_path() {
let header = create_test_header();
let result = create_bam_writer("/invalid/path/output.bam", &header, 1, 6);
assert!(result.is_err());
if let Err(e) = result {
let err_msg = e.to_string();
assert!(err_msg.contains("Failed to create output BAM"));
}
}
#[test]
fn test_create_optional_bam_writer_some() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let writer = create_optional_bam_writer(Some(temp_file.path()), &header, 1, 6)?;
assert!(writer.is_some());
Ok(())
}
#[test]
fn test_create_optional_bam_writer_none() -> Result<()> {
let header = create_test_header();
let writer = create_optional_bam_writer(None::<&str>, &header, 1, 6)?;
assert!(writer.is_none());
Ok(())
}
#[test]
fn test_create_optional_bam_writer_invalid_path() {
let header = create_test_header();
let result = create_optional_bam_writer(Some("/invalid/path/output.bam"), &header, 1, 6);
assert!(result.is_err());
}
#[test]
fn test_roundtrip_write_and_read() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
{
let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
}
let (mut reader, read_header) = create_bam_reader(temp_file.path(), 1)?;
assert_eq!(read_header.reference_sequences().len(), 1);
let records: Result<Vec<_>, _> = reader.records().collect();
assert!(records.is_ok());
Ok(())
}
#[test]
fn test_roundtrip_write_and_read_multithreaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
{
let _writer = create_bam_writer(temp_file.path(), &header, 4, 6)?;
}
let (mut reader, read_header) = create_bam_reader(temp_file.path(), 4)?;
assert_eq!(read_header.reference_sequences().len(), 1);
let records: Result<Vec<_>, _> = reader.records().collect();
assert!(records.is_ok());
Ok(())
}
#[test]
fn test_bgzf_writer_flush_single_threaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
writer.write_all(b"test data")?;
let result = writer.flush();
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_bgzf_writer_flush_multithreaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
let worker_count = NonZero::new(2).expect("2 is non-zero");
let compression_level = CompressionLevel::new(6).expect("valid compression level");
let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
worker_count,
output_file,
compression_level,
));
writer.write_all(b"test data")?;
let result = writer.flush();
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_bgzf_writer_finish_single_threaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
writer.write_all(b"test data")?;
let result = writer.finish();
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_bgzf_writer_finish_multithreaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
let worker_count = NonZero::new(2).expect("2 is non-zero");
let compression_level = CompressionLevel::new(6).expect("valid compression level");
let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
worker_count,
output_file,
compression_level,
));
writer.write_all(b"test data")?;
let result = writer.finish();
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_bgzf_writer_write_single_threaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
let bytes_written = writer.write(b"test")?;
assert_eq!(bytes_written, 4);
Ok(())
}
#[test]
fn test_bgzf_writer_write_multithreaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
let worker_count = NonZero::new(2).expect("2 is non-zero");
let compression_level = CompressionLevel::new(6).expect("valid compression level");
let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
worker_count,
output_file,
compression_level,
));
let bytes_written = writer.write(b"test")?;
assert_eq!(bytes_written, 4);
Ok(())
}
#[test]
fn test_is_stdin_path() {
assert!(is_stdin_path("-"));
assert!(is_stdin_path("/dev/stdin"));
assert!(is_stdin_path(Path::new("-")));
assert!(is_stdin_path(Path::new("/dev/stdin")));
assert!(!is_stdin_path("input.bam"));
assert!(!is_stdin_path("/path/to/file.bam"));
assert!(!is_stdin_path(""));
assert!(!is_stdin_path("/dev/null"));
}
#[test]
fn test_create_bam_reader_for_pipeline_from_file() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
{
let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
}
let (mut reader, read_header) = create_bam_reader_for_pipeline(temp_file.path())?;
assert_eq!(read_header.reference_sequences().len(), 1);
let mut buf = [0u8; 16];
let n = reader.read(&mut buf)?;
assert!(n > 0, "Should read some bytes from the file");
Ok(())
}
#[test]
fn test_create_bam_reader_for_pipeline_nonexistent_file() {
let result = create_bam_reader_for_pipeline("/nonexistent/file.bam");
assert!(result.is_err());
if let Err(e) = result {
let err_msg = e.to_string();
assert!(err_msg.contains("Failed to open input BAM"));
}
}
#[test]
fn test_chained_reader() {
let buffer = vec![1, 2, 3, 4, 5];
let remaining = io::Cursor::new(vec![6, 7, 8, 9, 10]);
let mut chained = ChainedReader::new(buffer, remaining);
let mut result = Vec::new();
chained.read_to_end(&mut result).expect("read_to_end should succeed");
assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
}
#[test]
fn test_chained_reader_empty_buffer() {
let buffer = vec![];
let remaining = io::Cursor::new(vec![1, 2, 3]);
let mut chained = ChainedReader::new(buffer, remaining);
let mut result = Vec::new();
chained.read_to_end(&mut result).expect("read_to_end should succeed");
assert_eq!(result, vec![1, 2, 3]);
}
#[test]
fn test_chained_reader_empty_remaining() {
let buffer = vec![1, 2, 3];
let remaining = io::Cursor::new(vec![]);
let mut chained = ChainedReader::new(buffer, remaining);
let mut result = Vec::new();
chained.read_to_end(&mut result).expect("read_to_end should succeed");
assert_eq!(result, vec![1, 2, 3]);
}
#[allow(clippy::cast_possible_truncation)]
fn create_test_bam_record(ref_id: i32, pos: i32, read_name: &[u8]) -> Vec<u8> {
let name_with_null = read_name.len() + 1;
let padding = (4 - (name_with_null % 4)) % 4;
let l_read_name = (name_with_null + padding) as u8;
let mapq: u8 = 60;
let bin: u16 = 4681; let n_cigar_op: u16 = 1;
let flag: u16 = 0; let l_seq: u32 = 10;
let next_ref_id: i32 = -1;
let next_pos: i32 = -1;
let tlen: i32 = 0;
let mut record = Vec::new();
record.extend_from_slice(&ref_id.to_le_bytes());
record.extend_from_slice(&pos.to_le_bytes());
record.push(l_read_name);
record.push(mapq);
record.extend_from_slice(&bin.to_le_bytes());
record.extend_from_slice(&n_cigar_op.to_le_bytes());
record.extend_from_slice(&flag.to_le_bytes());
record.extend_from_slice(&l_seq.to_le_bytes());
record.extend_from_slice(&next_ref_id.to_le_bytes());
record.extend_from_slice(&next_pos.to_le_bytes());
record.extend_from_slice(&tlen.to_le_bytes());
record.extend_from_slice(read_name);
record.push(0); record.extend(std::iter::repeat_n(0u8, padding));
let cigar_op: u32 = 10 << 4; record.extend_from_slice(&cigar_op.to_le_bytes());
record.extend_from_slice(&[0x11, 0x11, 0x11, 0x11, 0x11]);
record.extend_from_slice(&[30u8; 10]);
record
}
#[test]
fn test_create_indexing_bam_writer() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2);
assert!(writer.is_ok());
let writer = writer.expect("creating writer should succeed");
let index = writer.finish();
assert!(index.is_ok());
Ok(())
}
#[test]
fn test_indexing_bam_writer_with_records() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
for i in 0..100 {
let record = create_test_bam_record(0, i * 100, format!("read{i}").as_bytes());
writer.write_raw_record(&record)?;
}
let index = writer.finish()?;
assert!(!index.reference_sequences().is_empty());
Ok(())
}
#[test]
fn test_indexing_bam_writer_produces_valid_bam() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
{
let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
for i in 0..10 {
let record = create_test_bam_record(0, i * 100, format!("read{i}").as_bytes());
writer.write_raw_record(&record)?;
}
let _index = writer.finish()?;
}
let (mut reader, read_header) = create_bam_reader(temp_file.path(), 1)?;
assert_eq!(read_header.reference_sequences().len(), 1);
let records: Vec<_> = reader.records().collect();
assert_eq!(records.len(), 10);
Ok(())
}
#[test]
fn test_indexing_bam_writer_multithreaded() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 4)?;
for i in 0..1000 {
let record = create_test_bam_record(0, i * 10, format!("read{i:04}").as_bytes());
writer.write_raw_record(&record)?;
}
let index = writer.finish()?;
assert!(!index.reference_sequences().is_empty());
Ok(())
}
#[test]
fn test_indexing_bam_writer_unmapped_records() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
let record = create_test_bam_record(-1, -1, b"unmapped_read");
writer.write_raw_record(&record)?;
let record = create_test_bam_record(0, 100, b"mapped_read");
writer.write_raw_record(&record)?;
let index = writer.finish()?;
assert!(!index.reference_sequences().is_empty());
Ok(())
}
#[test]
fn test_write_raw_bytes() -> Result<()> {
let temp = tempfile::NamedTempFile::new()?;
let header = noodles::sam::Header::default();
let mut writer = create_raw_bam_writer(temp.path(), &header, 1, 1)?;
let record_bytes = vec![1, 2, 3, 4, 5];
let mut formatted = Vec::new();
#[allow(clippy::cast_possible_truncation)]
let len = record_bytes.len() as u32;
formatted.extend_from_slice(&len.to_le_bytes());
formatted.extend_from_slice(&record_bytes);
writer.write_raw_bytes(&formatted)?;
writer.finish()?;
assert!(temp.path().metadata()?.len() > 0);
Ok(())
}
#[test]
fn test_is_stdout_path() {
assert!(is_stdout_path("-"));
assert!(is_stdout_path("/dev/stdout"));
assert!(is_stdout_path(Path::new("-")));
assert!(is_stdout_path(Path::new("/dev/stdout")));
assert!(!is_stdout_path("output.bam"));
assert!(!is_stdout_path("/path/to/file.bam"));
assert!(!is_stdout_path(""));
assert!(!is_stdout_path("/dev/null"));
}
#[test]
fn test_indexing_writer_rejects_stdout() {
let header = Header::default();
let result = create_indexing_bam_writer("-", &header, 6, 2);
assert!(result.is_err());
let err = result.err().expect("result should be Err");
assert!(err.to_string().contains("stdout"));
}
#[test]
fn test_create_bam_reader_for_pipeline_with_async_reader() -> Result<()> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
{
let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
}
let opts = PipelineReaderOpts { async_reader: true };
let (mut reader, read_header) =
create_bam_reader_for_pipeline_with_opts(temp_file.path(), opts)?;
assert_eq!(read_header.reference_sequences().len(), 1);
let mut buf = [0u8; 16];
let n = reader.read(&mut buf)?;
assert!(n > 0, "Should read some bytes from the async reader");
let mut rest = Vec::new();
reader.read_to_end(&mut rest)?;
Ok(())
}
}