use std::fmt;
use std::io::{self, Read};
use std::error::Error;
use async_stream::stream;
use byteorder::{LittleEndian, ReadBytesExt};
use futures::executor::block_on_stream;
use futures_core::stream::Stream;
use futures::StreamExt;
use crate::bgzf::BgzfReader;
use crate::genome::Genome;
use crate::netfile::NetFile;
use crate::range::Range;
use crate::read;
use crate::utility_io::{read_until_null, skip_n_bytes};
#[derive(Clone, Debug, Default)]
pub struct BamSeq(pub Vec<u8>);
impl fmt::Display for BamSeq {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let t = b"=ACMGRSVTWYHKDBN";
for &byte in &self.0 {
let b1 = byte >> 4;
let b2 = byte & 0xf;
write!(f, "{}", t[b1 as usize] as char)?;
if b2 != 0 {
write!(f, "{}", t[b2 as usize] as char)?;
}
}
Ok(())
}
}
#[derive(Clone, Debug, Default)]
pub struct BamQual(pub Vec<u8>);
impl fmt::Display for BamQual {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for &byte in &self.0 {
write!(f, "{}", (byte + 33) as char)?;
}
Ok(())
}
}
#[derive(Clone, Debug, Default)]
pub struct BamAuxiliary {
pub tag : [u8; 2],
pub value: BamAuxValue,
}
#[derive(Clone, Debug)]
pub enum BamAuxValue {
A (u8),
C (i8),
CUnsigned (u8),
S (i16),
SUnsigned (u16),
I (i32),
IUnsigned (u32),
F (f32),
D (f64),
Z (String),
H (String),
BInt8 (Vec<i8>),
BUint8 (Vec<u8>),
BInt16 (Vec<i16>),
BUint16 (Vec<u16>),
BInt32 (Vec<i32>),
BUint32 (Vec<u32>),
BFloat32 (Vec<f32>),
None (),
}
impl Default for BamAuxValue {
fn default() -> Self {
Self::None()
}
}
impl fmt::Display for BamAuxiliary {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}{}:", self.tag[0] as char, self.tag[1] as char)?;
match &self.value {
BamAuxValue::A(v) => write!(f, "{}", *v as char),
BamAuxValue::C(v) => write!(f, "{}", v),
BamAuxValue::CUnsigned(v) => write!(f, "{}", v),
BamAuxValue::S(v) => write!(f, "{}", v),
BamAuxValue::SUnsigned(v) => write!(f, "{}", v),
BamAuxValue::I(v) => write!(f, "{}", v),
BamAuxValue::IUnsigned(v) => write!(f, "{}", v),
BamAuxValue::F(v) => write!(f, "{}", v),
BamAuxValue::D(v) => write!(f, "{}", v),
BamAuxValue::Z(v) => write!(f, "{}", v),
BamAuxValue::H(v) => write!(f, "{}", v),
BamAuxValue::BInt8(v) => write!(f, "{:?}", v),
BamAuxValue::BUint8(v) => write!(f, "{:?}", v),
BamAuxValue::BInt16(v) => write!(f, "{:?}", v),
BamAuxValue::BUint16(v) => write!(f, "{:?}", v),
BamAuxValue::BInt32(v) => write!(f, "{:?}", v),
BamAuxValue::BUint32(v) => write!(f, "{:?}", v),
BamAuxValue::BFloat32(v) => write!(f, "{:?}", v),
BamAuxValue::None() => panic!("internal error"),
}
}
}
impl BamAuxiliary {
fn read<R: Read>(reader: &mut R) -> io::Result<(u64, Self)> {
let mut tag = [0; 2];
let mut n = 0 as u64;
reader.read_exact(&mut tag)?; n += 2;
let value_type = reader.read_u8()?; n += 1;
let value = match value_type {
b'A' => {n += 1; BamAuxValue::A(reader.read_u8()?)},
b'c' => {n += 1; BamAuxValue::C(reader.read_i8()?)},
b'C' => {n += 1; BamAuxValue::CUnsigned(reader.read_u8()?)},
b's' => {n += 2; BamAuxValue::S(reader.read_i16::<LittleEndian>()?)},
b'S' => {n += 2; BamAuxValue::SUnsigned(reader.read_u16::<LittleEndian>()?)},
b'i' => {n += 4; BamAuxValue::I(reader.read_i32::<LittleEndian>()?)},
b'I' => {n += 4; BamAuxValue::IUnsigned(reader.read_u32::<LittleEndian>()?)},
b'f' => {n += 4; BamAuxValue::F(reader.read_f32::<LittleEndian>()?)},
b'd' => {n += 8; BamAuxValue::D(reader.read_f64::<LittleEndian>()?)},
b'Z' => {
let buffer : Vec<u8> = read_until_null(reader)?; n += (buffer.len() + 1) as u64;
BamAuxValue::Z(String::from_utf8_lossy(&buffer).to_string())
}
b'H' => {
let buffer : Vec<u8> = read_until_null(reader)?; n += (buffer.len() + 1) as u64;
BamAuxValue::H(buffer.iter().map(|b| format!("{:X}", b)).collect::<String>())
}
b'B' => {
let array_type = reader.read_u8()?; n += 1;
let array_len = reader.read_i32::<LittleEndian>()?; n += 4;
match array_type {
b'c' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_i8()?); n += 1;
}
BamAuxValue::BInt8(vec)
}
b'C' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_u8()?); n += 1;
}
BamAuxValue::BUint8(vec)
}
b's' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_i16::<LittleEndian>()?); n += 2;
}
BamAuxValue::BInt16(vec)
}
b'S' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_u16::<LittleEndian>()?); n += 2;
}
BamAuxValue::BUint16(vec)
}
b'i' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_i32::<LittleEndian>()?); n += 4;
}
BamAuxValue::BInt32(vec)
}
b'I' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_u32::<LittleEndian>()?); n += 4;
}
BamAuxValue::BUint32(vec)
}
b'f' => {
let mut vec = Vec::with_capacity(array_len as usize);
for _ in 0..array_len {
vec.push(reader.read_f32::<LittleEndian>()?); n += 4;
}
BamAuxValue::BFloat32(vec)
}
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid array type")),
}
}
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid auxiliary type")),
};
Ok((n, BamAuxiliary { tag, value }))
}
}
#[derive(Clone, Debug, Default)]
pub struct BamFlag(pub u16);
impl BamFlag {
pub fn bit(&self, i: u8) -> bool {
(self.0 >> i) & 1 == 1
}
pub fn read_paired(&self) -> bool {
self.bit(0)
}
pub fn read_mapped_proper_paired(&self) -> bool {
self.bit(1)
}
pub fn unmapped(&self) -> bool {
self.bit(2)
}
pub fn mate_unmapped(&self) -> bool {
self.bit(3)
}
pub fn reverse_strand(&self) -> bool {
self.bit(4)
}
pub fn mate_reverse_strand(&self) -> bool {
self.bit(5)
}
pub fn first_in_pair(&self) -> bool {
self.bit(6)
}
pub fn second_in_pair(&self) -> bool {
self.bit(7)
}
pub fn secondary_alignment(&self) -> bool {
self.bit(8)
}
pub fn not_passing_filters(&self) -> bool {
self.bit(9)
}
pub fn duplicate(&self) -> bool {
self.bit(10)
}
}
#[derive(Clone, Debug, Default)]
pub struct BamCigar(pub Vec<u32>);
impl fmt::Display for BamCigar {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for cigar_block in self.parse_cigar() {
write!(f, "{}{}", cigar_block.n, cigar_block.type_)?;
}
Ok(())
}
}
impl BamCigar {
pub fn alignment_length(&self) -> usize {
let mut length = 0;
for cigar_block in self.parse_cigar() {
match cigar_block.type_ {
'M' | 'D' | 'N' | '=' | 'X' => length += cigar_block.n as usize,
_ => {}
}
}
length
}
pub fn parse_cigar(&self) -> impl Iterator<Item = CigarBlock> + '_ {
let types = b"MIDNSHP=X";
self.0.iter().map(move |&c| {
let n = c >> 4;
let t = types[(c & 0xf) as usize] as char;
CigarBlock { n: n as i32, type_: t }
})
}
}
#[derive(Debug, Default)]
pub struct CigarBlock {
pub n : i32,
pub type_: char,
}
#[derive(Clone, Debug, Default)]
pub struct BamHeader {
pub text_length: i32,
pub text : String,
pub n_ref : i32,
}
#[derive(Clone, Debug, Default)]
pub struct BamBlock {
pub ref_id : i32,
pub position : i32,
pub bin : u16,
pub mapq : u8,
pub rname_len : u8,
pub flag : BamFlag,
pub n_cigar_op : u16,
pub l_seq : i32,
pub next_ref_id : i32,
pub next_position: i32,
pub tlen : i32,
pub read_name : String,
pub cigar : BamCigar,
pub seq : BamSeq,
pub qual : BamQual,
pub auxiliary : Vec<BamAuxiliary>,
}
#[derive(Clone, Debug, Default)]
pub struct BamReaderType1 {
pub block: BamBlock,
}
#[derive(Clone, Debug, Default)]
pub struct BamReaderType2 {
pub block1: BamBlock,
pub block2: BamBlock,
}
#[derive(Copy, Clone, Debug, Default)]
pub struct BamReaderOptions {
pub read_name : bool,
pub read_cigar : bool,
pub read_sequence : bool,
pub read_auxiliary: bool,
pub read_qual : bool,
}
#[derive(Debug)]
pub struct BamReader<R: Read> {
options : BamReaderOptions,
header : BamHeader,
genome : Genome,
reader : BgzfReader<R>,
}
impl<R: Read> BamReader<R> {
pub fn new(reader: R, options_arg: Option<BamReaderOptions>) -> io::Result<Self> {
let mut bam_reader = BamReader {
options : options_arg.unwrap_or_default(),
genome : Genome::default(),
header : BamHeader::default(),
reader : BgzfReader::new(reader)?,
};
if options_arg.is_none() {
bam_reader.options.read_name = true;
bam_reader.options.read_cigar = true;
bam_reader.options.read_sequence = true;
bam_reader.options.read_auxiliary = true;
bam_reader.options.read_qual = true;
}
let mut magic = [0; 4];
bam_reader.reader.read_exact(&mut magic)?;
if &magic != b"BAM\x01" {
return Err(io::Error::new(io::ErrorKind::InvalidData, "not a BAM file"));
}
bam_reader.header.text_length = bam_reader.reader.read_i32::<LittleEndian>()?;
let mut text_bytes = vec![0; bam_reader.header.text_length as usize];
bam_reader.reader.read_exact(&mut text_bytes)?;
bam_reader.header.text = String::from_utf8(text_bytes).unwrap();
bam_reader.header.n_ref = bam_reader.reader.read_i32::<LittleEndian>()?;
for _ in 0..bam_reader.header.n_ref {
let length_name = bam_reader.reader.read_i32::<LittleEndian>()?;
let mut name_bytes = vec![0; length_name as usize];
bam_reader.reader.read_exact(&mut name_bytes)?;
let length_seq = bam_reader.reader.read_i32::<LittleEndian>()?;
bam_reader.genome.add_sequence(
String::from_utf8(name_bytes).unwrap().trim_matches('\0').to_string(),
length_seq as usize,
).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
}
Ok(bam_reader)
}
pub fn get_genome(&self) -> &Genome {
&self.genome
}
}
impl<R: Read> BamReader<R> {
pub fn read_single_end_stream<'a>(
&'a mut self
) -> impl Stream<Item = io::Result<BamReaderType1>> + 'a {
stream! {
let mut block_size: i32;
let mut flag_nc : u32;
let mut bin_mq_nl : u32;
loop {
let mut block = BamBlock::default();
let mut buf = Vec::new();
block_size = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return;
}
yield Err(e); return;
}
};
block.ref_id = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
block.position = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
bin_mq_nl = match self.reader.read_u32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
block.bin = ((bin_mq_nl >> 16) & 0xffff) as u16;
block.mapq = ((bin_mq_nl >> 8) & 0xff ) as u8;
block.rname_len = (bin_mq_nl & 0xff) as u8;
flag_nc = match self.reader.read_u32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
block.flag = BamFlag((flag_nc >> 16) as u16);
block.n_cigar_op = (flag_nc & 0xffff) as u16;
block.l_seq = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
block.next_ref_id = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
block.next_position = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
block.tlen = match self.reader.read_i32::<LittleEndian>() {
Ok (v) => v,
Err(e) => { yield Err(e); return; }
};
loop {
match self.reader.read_u8() {
Ok (b) if b == 0 => {
block.read_name = String::from_utf8(buf.clone()).unwrap();
break;
}
Ok (b) => buf.push(b),
Err(e) => { yield Err(e); return; }
}
}
if self.options.read_cigar {
block.cigar = BamCigar(Vec::with_capacity(block.n_cigar_op as usize));
for _ in 0..block.n_cigar_op {
if let Err(e) = self.reader.read_u32::<LittleEndian>().map(|v| block.cigar.0.push(v)) {
yield Err(e); return;
}
}
} else {
if let Err(e) = skip_n_bytes(&mut self.reader, (block.n_cigar_op * 4) as usize) {
yield Err(e); return;
}
}
if self.options.read_sequence {
let seq_len = (block.l_seq + 1) / 2;
block.seq = BamSeq(vec![0; seq_len as usize]);
if let Err(e) = self.reader.read_exact(&mut block.seq.0) {
yield Err(e); return;
}
} else {
let seq_len = (block.l_seq + 1) / 2;
if let Err(e) = skip_n_bytes(&mut self.reader, seq_len as usize) {
yield Err(e); return;
}
}
if self.options.read_qual {
block.qual = BamQual(vec![0; block.l_seq as usize]);
if let Err(e) = self.reader.read_exact(&mut block.qual.0) {
yield Err(e); return;
}
} else {
if let Err(e) = skip_n_bytes(&mut self.reader, block.l_seq as usize) {
yield Err(e); return;
}
}
let mut position = (8 * 4 + block.rname_len as usize + 4 * block.n_cigar_op as usize
+ (block.l_seq as usize + 1) / 2 + block.l_seq as usize) as i32;
if self.options.read_auxiliary {
while position < block_size {
match BamAuxiliary::read(&mut self.reader) {
Ok ((bytes_read, aux)) => {
block.auxiliary.push(aux);
position += bytes_read as i32;
}
Err(e) => { yield Err(e); return; }
}
}
} else {
if let Err(e) = skip_n_bytes(&mut self.reader, (block_size - position) as usize) {
yield Err(e); return;
}
}
yield Ok(BamReaderType1{
block
});
}
}
}
pub fn read_paired_end_stream<'a>(
&'a mut self
) -> impl Stream<Item = io::Result<BamReaderType2>> + 'a {
self.options.read_name = true;
stream! {
let mut cache : std::collections::HashMap<String, BamBlock> = std::collections::HashMap::new();
let mut iterator = Box::pin(self.read_single_end_stream());
while let Some(item) = iterator.next().await {
match item {
Err(e) => yield Err(e),
Ok (r) => {
let block1 = r.block;
if block1.flag.read_paired() {
if let Some(block2) = cache.remove(&block1.read_name) {
let paired_block = if block1.position < block2.position {
BamReaderType2 {
block1: block1,
block2: block2,
}
} else {
BamReaderType2 {
block1: block2,
block2: block1,
}
};
yield Ok(paired_block);
} else {
cache.insert(block1.read_name.clone(), block1);
}
}
}
}
}
}
}
pub fn read_simple_stream<'a>(
&'a mut self,
join_pairs: bool,
paired_end_strand_specific: bool
) -> impl Stream<Item = io::Result<read::Read>> + 'a {
let genome = self.genome.clone();
stream!{
self.options.read_cigar = true;
let mut iterator = Box::pin(self.read_paired_end_stream());
while let Some(item) = iterator.next().await {
match item {
Err(e) => {yield Err(e); break},
Ok (r) => {
if r.block1.flag.read_paired() && join_pairs {
if r.block1.flag.unmapped() || !r.block1.flag.read_mapped_proper_paired() {
continue;
}
if r.block2.flag.unmapped() || !r.block2.flag.read_mapped_proper_paired() {
continue;
}
let seqname = genome.seqnames[r.block1.ref_id as usize].clone();
let from = r.block1.position;
let to = r.block2.position + r.block2.cigar.alignment_length() as i32;
let mut strand = b'*';
let duplicate = r.block1.flag.duplicate() || r.block2.flag.duplicate();
let mapq = std::cmp::min(r.block1.mapq as i32, r.block2.mapq as i32);
if from < 0 {
yield Err(io::Error::new(io::ErrorKind::InvalidData, format!("invalid position detected: from={}", from)));
}
if paired_end_strand_specific {
if r.block1.flag.second_in_pair() {
strand = if r.block1.flag.reverse_strand() { b'-' } else { b'+' };
} else {
strand = if r.block2.flag.reverse_strand() { b'-' } else { b'+' };
}
}
yield Ok(read::Read {
seqname : seqname,
range : Range::new(from as usize, to as usize),
strand : strand as char,
mapq : mapq as i64,
duplicate : duplicate,
paired_end: true,
});
} else if !r.block1.flag.unmapped() {
let seqname = genome.seqnames[r.block1.ref_id as usize].clone();
let from = r.block1.position;
let to = r.block1.position + r.block1.cigar.alignment_length() as i32;
let strand = if r.block1.flag.reverse_strand() { b'-' } else { b'+' };
let mapq = r.block1.mapq as i32;
let duplicate = r.block1.flag.duplicate();
let paired = r.block1.flag.read_paired();
yield Ok(read::Read {
seqname : seqname,
range : Range::new(from as usize, to as usize),
strand : strand as char,
mapq : mapq as i64,
duplicate : duplicate,
paired_end: paired,
});
}
}
}
}
}
}
}
impl<R: Read> BamReader<R> {
pub fn read_single_end<'a>(&'a mut self) -> impl Iterator<Item = io::Result<BamReaderType1>> + 'a {
let s = Box::pin(self.read_single_end_stream());
block_on_stream(s)
}
pub fn read_paired_end<'a>(&'a mut self) -> impl Iterator<Item = io::Result<BamReaderType2>> + 'a {
let s = Box::pin(self.read_paired_end_stream());
block_on_stream(s)
}
pub fn read_simple<'a>(
&'a mut self,
join_pairs: bool,
paired_end_strand_specific: bool
) -> impl Iterator<Item = io::Result<read::Read>> + 'a {
let s = Box::pin(self.read_simple_stream(join_pairs, paired_end_strand_specific));
block_on_stream(s)
}
}
#[derive(Debug)]
pub struct BamFile {
pub reader: BamReader<NetFile>,
}
impl BamFile {
pub fn open(filename: &str, options: Option<BamReaderOptions>) -> Result<Self, Box<dyn Error>> {
let file = NetFile::open(filename)?;
let reader = BamReader::new(file, options)?;
Ok(BamFile {
reader : reader,
})
}
}
pub fn bam_read_genome<R: Read>(reader: R) -> Result<Genome, Box<dyn Error>> {
let bam_reader = BamReader::<R>::new(reader, Some(BamReaderOptions::default()))?;
Ok(bam_reader.genome)
}
pub fn bam_import_genome(filename: &str) -> Result<Genome, Box<dyn Error>> {
let file = NetFile::open(filename)?;
Ok(bam_read_genome(file)?)
}
#[cfg(test)]
mod tests {
use crate::bam::BamFile;
#[test]
fn test_bam_genome() {
let result = BamFile::open("tests/test_bam_1.bam", None);
assert!(result.is_ok());
if let Ok(bam) = result {
let genome = bam.reader.genome;
assert_eq!(genome.len(), 2);
assert_eq!(genome.seqnames[0], "ref");
assert_eq!(genome.seqnames[1], "ref2");
assert_eq!(genome.lengths [0], 45);
assert_eq!(genome.lengths [1], 40);
}
}
#[test]
fn test_bam_read_simple() {
let result = BamFile::open("tests/test_bam_2.bam", None);
assert!(result.is_ok());
let mut bam = result.unwrap();
let mut cnt = 0;
for item in bam.reader.read_simple(true, true) {
if !item.is_ok() {
break;
}
cnt += 1;
}
assert_eq!(cnt, 2335);
}
}