use std::fs::File;
use std::io::{Read, Seek, Result, Error};
use std::io::ErrorKind::{InvalidData, InvalidInput};
use std::path::{Path, PathBuf};
use super::index::{self, Index, Chunk, VirtualOffset};
use super::record;
use super::bgzip::{self, ReadBgzip};
use super::header::Header;
use super::RecordReader;
pub struct RegionViewer<'a, R: Read + Seek> {
parent: &'a mut IndexedReader<R>,
start: i32,
end: i32,
predicate: Box<dyn Fn(&record::Record) -> bool>,
}
impl<'a, R: Read + Seek> RegionViewer<'a, R> {
pub fn header(&self) -> &Header {
self.parent.header()
}
pub fn index(&self) -> &Index {
self.parent.index()
}
pub fn current_offset(&self) -> VirtualOffset {
self.parent.current_offset()
}
}
impl<'a, R: Read + Seek> RecordReader for RegionViewer<'a, R> {
fn read_into(&mut self, record: &mut record::Record) -> Result<bool> {
loop {
let res = record.fill_from_bam(&mut self.parent.reader);
if !res.as_ref().unwrap_or(&false) {
record.clear();
return res;
}
if record.start() >= self.end {
record.clear();
return Ok(false);
}
if !(self.predicate)(&record) {
continue;
}
let record_bin = record.calculate_bin();
if record_bin > index::MAX_BIN {
record.clear();
return Err(Error::new(InvalidData, "Read has BAI bin bigger than max possible value"));
}
let (min_start, max_end) = index::bin_to_region(record_bin);
if min_start >= self.start && max_end <= self.end {
return Ok(true);
}
let record_end = record.calculate_end();
if record.flag().is_mapped() && record_end < record.start() {
record.clear();
return Err(Error::new(InvalidData, "Corrupted record: aln_end < aln_start"));
}
if record.flag().is_mapped() {
if record_end > self.start {
return Ok(true);
}
} else if record.start() >= self.start {
return Ok(true);
}
}
}
fn pause(&mut self) {
self.parent.pause();
}
}
impl<'a, R: Read + Seek> Iterator for RegionViewer<'a, R> {
type Item = Result<record::Record>;
fn next(&mut self) -> Option<Self::Item> {
let mut record = record::Record::new();
match self.read_into(&mut record) {
Ok(true) => Some(Ok(record)),
Ok(false) => None,
Err(e) => Some(Err(e)),
}
}
}
pub enum ModificationTime {
Error,
Ignore,
Warn(Box<dyn Fn(&str)>),
}
impl ModificationTime {
fn check<T: AsRef<Path>, U: AsRef<Path>>(&self, bam_path: T, bai_path: U) -> Result<()> {
let bam_modified = bam_path.as_ref().metadata().and_then(|metadata| metadata.modified());
let bai_modified = bai_path.as_ref().metadata().and_then(|metadata| metadata.modified());
let bam_younger = match (bam_modified, bai_modified) {
(Ok(bam_time), Ok(bai_time)) => bai_time < bam_time,
_ => false, };
if !bam_younger {
return Ok(());
}
match &self {
ModificationTime::Ignore => {},
ModificationTime::Error => return Err(Error::new(InvalidInput,
"the BAM file is younger than the BAI index")),
ModificationTime::Warn(box_fun) =>
box_fun("the BAM file is younger than the BAI index"),
}
Ok(())
}
pub fn warn<F: Fn(&str) + 'static>(warning: F) -> Self {
ModificationTime::Warn(Box::new(warning))
}
}
pub struct IndexedReaderBuilder {
bai_path: Option<PathBuf>,
modification_time: ModificationTime,
additional_threads: u16,
}
impl IndexedReaderBuilder {
pub fn new() -> Self {
Self {
bai_path: None,
modification_time: ModificationTime::Error,
additional_threads: 0,
}
}
pub fn bai_path<P: AsRef<Path>>(&mut self, path: P) -> &mut Self {
self.bai_path = Some(path.as_ref().to_path_buf());
self
}
pub fn modification_time(&mut self, modification_time: ModificationTime) -> &mut Self {
self.modification_time = modification_time;
self
}
pub fn additional_threads(&mut self, additional_threads: u16) -> &mut Self {
self.additional_threads = additional_threads;
self
}
pub fn from_path<P: AsRef<Path>>(&self, bam_path: P) -> Result<IndexedReader<File>> {
let bam_path = bam_path.as_ref();
let bai_path = self.bai_path.as_ref().map(PathBuf::clone)
.unwrap_or_else(|| PathBuf::from(format!("{}.bai", bam_path.display())));
self.modification_time.check(&bam_path, &bai_path)?;
let reader = bgzip::SeekReader::from_path(bam_path, self.additional_threads)
.map_err(|e| Error::new(e.kind(), format!("Failed to open BAM file: {}", e)))?;
let index = Index::from_path(bai_path)
.map_err(|e| Error::new(e.kind(), format!("Failed to open BAI index: {}", e)))?;
IndexedReader::new(reader, index)
}
pub fn from_streams<R: Read + Seek, T: Read>(&self, bam_stream: R, bai_stream: T)
-> Result<IndexedReader<R>> {
let reader = bgzip::SeekReader::from_stream(bam_stream, self.additional_threads)
.map_err(|e| Error::new(e.kind(), format!("Failed to read BAM stream: {}", e)))?;
let index = Index::from_stream(bai_stream)
.map_err(|e| Error::new(e.kind(), format!("Failed to read BAI index: {}", e)))?;
IndexedReader::new(reader, index)
}
}
#[derive(Clone, Debug)]
pub struct Region {
ref_id: u32,
start: u32,
end: u32,
}
impl Region {
pub fn new(ref_id: u32, start: u32, end: u32) -> Region {
assert!(start <= end, "Region: start should not be greater than end ({} > {})", start, end);
Region { ref_id, start, end }
}
pub fn ref_id(&self) -> u32 {
self.ref_id
}
pub fn start(&self) -> u32 {
self.start
}
pub fn end(&self) -> u32 {
self.end
}
pub fn len(&self) -> u32 {
self.end - self.start
}
pub fn set_ref_id(&mut self, ref_id: u32) {
self.ref_id = ref_id;
}
pub fn set_start(&mut self, start: u32) {
assert!(start <= self.end, "Region: start should not be greater than end ({} > {})", start, self.end);
self.start = start;
}
pub fn set_end(&mut self, end: u32) {
assert!(self.start <= end, "Region: start should not be greater than end ({} > {})", self.start, end);
self.end = end;
}
pub fn contains(&self, ref_id: u32, pos: u32) -> bool {
self.ref_id == ref_id && self.start <= pos && pos < self.end
}
}
pub struct IndexedReader<R: Read + Seek> {
reader: bgzip::SeekReader<R>,
header: Header,
index: Index,
}
impl IndexedReader<File> {
pub fn build() -> IndexedReaderBuilder {
IndexedReaderBuilder::new()
}
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self> {
IndexedReaderBuilder::new().from_path(path)
}
}
impl<R: Read + Seek> IndexedReader<R> {
fn new(mut reader: bgzip::SeekReader<R>, index: Index) -> Result<Self> {
reader.make_consecutive();
let header = Header::from_bam(&mut reader)?;
Ok(Self { reader, header, index })
}
pub fn fetch<'a>(&'a mut self, region: &Region) -> Result<RegionViewer<'a, R>> {
self.fetch_by(region, |_| true)
}
pub fn fetch_by<'a, F>(&'a mut self, region: &Region, predicate: F) -> Result<RegionViewer<'a, R>>
where F: 'static + Fn(&record::Record) -> bool
{
match self.header.reference_len(region.ref_id()) {
None => return Err(Error::new(InvalidInput,
format!("Failed to fetch records: out of bounds reference {}", region.ref_id()))),
Some(len) if len < region.end() => return Err(Error::new(InvalidInput,
format!("Failed to fetch records: end > reference length ({} > {})", region.end(), len))),
_ => {},
}
let chunks = self.index.fetch_chunks(region.ref_id(), region.start() as i32, region.end() as i32);
self.reader.set_chunks(chunks);
Ok(RegionViewer {
parent: self,
start: region.start() as i32,
end: region.end() as i32,
predicate: Box::new(predicate),
})
}
pub fn full<'a>(&'a mut self) -> RegionViewer<'a, R> {
self.full_by(|_| true)
}
pub fn full_by<'a, F>(&'a mut self, predicate: F) -> RegionViewer<'a, R>
where F: 'static + Fn(&record::Record) -> bool
{
if let Some(offset) = self.index.start_offset() {
self.reader.set_chunks(vec![index::Chunk::new(offset, index::VirtualOffset::MAX)]);
}
RegionViewer {
parent: self,
start: std::i32::MIN,
end: std::i32::MAX,
predicate: Box::new(predicate),
}
}
pub fn unmapped<'a>(&'a mut self) -> RegionViewer<'a, R> {
self.unmapped_by(|_| true)
}
pub fn unmapped_by<'a, F>(&'a mut self, predicate: F) -> RegionViewer<'a, R>
where F: 'static + Fn(&record::Record) -> bool
{
if let Some(offset) = self.index.end_offset() {
self.reader.set_chunks(vec![index::Chunk::new(offset, index::VirtualOffset::MAX)]);
}
RegionViewer {
parent: self,
start: -1,
end: 0,
predicate: Box::new(predicate),
}
}
pub fn fetch_chunks<'a, I>(&'a mut self, chunks: I) -> RegionViewer<'a, R>
where I: IntoIterator<Item = Chunk>
{
self.fetch_chunks_by(chunks, |_| true)
}
pub fn fetch_chunks_by<'a, F, I>(&'a mut self, chunks: I, predicate: F) -> RegionViewer<'a, R>
where F: 'static + Fn(&record::Record) -> bool,
I: IntoIterator<Item = Chunk>,
{
self.reader.set_chunks(chunks);
RegionViewer {
parent: self,
start: std::i32::MIN,
end: std::i32::MAX,
predicate: Box::new(predicate),
}
}
pub fn header(&self) -> &Header {
&self.header
}
pub fn index(&self) -> &Index {
&self.index
}
pub fn pause(&mut self) {
self.reader.pause();
}
pub fn current_offset(&self) -> VirtualOffset {
self.reader.current_offset()
}
}
pub struct BamReader<R: Read> {
reader: bgzip::ConsecutiveReader<R>,
header: Header,
}
impl BamReader<File> {
pub fn from_path<P: AsRef<Path>>(path: P, additional_threads: u16) -> Result<Self> {
let stream = File::open(path)
.map_err(|e| Error::new(e.kind(), format!("Failed to open BAM file: {}", e)))?;
Self::from_stream(stream, additional_threads)
}
}
impl<R: Read> BamReader<R> {
pub fn from_stream(stream: R, additional_threads: u16) -> Result<Self> {
let mut reader = bgzip::ConsecutiveReader::from_stream(stream, additional_threads);
let header = Header::from_bam(&mut reader)?;
Ok(Self { reader, header })
}
pub fn header(&self) -> &Header {
&self.header
}
}
impl<R: Read> RecordReader for BamReader<R> {
fn read_into(&mut self, record: &mut record::Record) -> Result<bool> {
let res = record.fill_from_bam(&mut self.reader);
if !res.as_ref().unwrap_or(&false) {
record.clear();
}
res
}
fn pause(&mut self) {
self.reader.pause();
}
}
impl<R: Read> Iterator for BamReader<R> {
type Item = Result<record::Record>;
fn next(&mut self) -> Option<Self::Item> {
let mut record = record::Record::new();
match self.read_into(&mut record) {
Ok(true) => Some(Ok(record)),
Ok(false) => None,
Err(e) => Some(Err(e)),
}
}
}