use std::rc::Rc;
use std::io;
use std::cmp::min;
use super::{Record, RecordReader};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AlnType {
Deletion,
Match,
Insertion(u32),
}
#[derive(Clone)]
pub struct PileupEntry {
record: Rc<Record>,
query_start: u32,
query_end: u32,
aln_query_end: u32,
ref_pos: u32,
cigar_index: usize,
cigar_remaining: u32,
}
impl PileupEntry {
fn new(record: Rc<Record>) -> Self {
let ref_pos = record.start();
let mut cigar_index = 0;
let mut query_pos = 0;
let cigar_remaining = loop {
let (len, op) = record.cigar().at(cigar_index);
if op.consumes_ref() {
break len;
}
if op.consumes_query() {
query_pos += len;
}
cigar_index += 1;
};
assert!(cigar_index < record.cigar().len(), "CIGAR cannot contain only insertions");
let aln_query_end = record.aligned_query_end();
let mut res = PileupEntry {
record,
query_start: query_pos,
query_end: query_pos,
aln_query_end,
ref_pos: ref_pos as u32,
cigar_index,
cigar_remaining,
};
res.update_query_end();
res
}
fn update_query_end(&mut self) {
let op = self.record.cigar().at(self.cigar_index).1;
self.query_end = if !op.consumes_query() {
self.query_start
} else if self.cigar_remaining == 1 {
let mut query_end = self.query_start + 1;
let mut i = self.cigar_index + 1;
while i < self.record.cigar().len() && query_end < self.aln_query_end {
let (len, op) = self.record.cigar().at(i);
if op.consumes_ref() {
break;
} else if op.consumes_query() {
query_end += len;
}
i += 1;
}
min(query_end, self.aln_query_end)
} else {
self.query_start + 1
};
}
fn move_forward(&mut self) -> bool {
let op = self.record.cigar().at(self.cigar_index).1;
self.cigar_remaining -= 1;
if op.consumes_ref() {
self.ref_pos += 1;
}
if op.consumes_query() {
self.query_start += 1;
}
while self.cigar_remaining == 0 {
self.cigar_index += 1;
if self.cigar_index == self.record.cigar().len() || self.query_start >= self.aln_query_end {
return false;
}
let (len, op) = self.record.cigar().at(self.cigar_index);
if op.consumes_ref() {
self.cigar_remaining = len;
} else if op.consumes_query() {
self.query_start += len;
}
}
self.update_query_end();
assert!(self.query_end <= self.aln_query_end);
true
}
pub fn record(&self) -> &Rc<Record> {
&self.record
}
pub fn query_start(&self) -> u32 {
self.query_start
}
pub fn query_end(&self) -> u32 {
self.query_end
}
pub fn len(&self) -> u32 {
self.query_end - self.query_start
}
pub fn aln_type(&self) -> AlnType {
match self.len() {
0 => AlnType::Deletion,
1 => AlnType::Match,
x => AlnType::Insertion(x - 1),
}
}
pub fn ref_pos(&self) -> u32 {
self.ref_pos
}
pub fn sequence(&self) -> Option<super::record::sequence::SubseqIter> {
if self.record.sequence().available() {
Some(self.record.sequence().subseq(self.query_start as usize..self.query_end as usize))
} else {
None
}
}
pub fn qualities(&self) -> Option<&[u8]> {
if self.record.qualities().available() {
Some(&self.record.qualities().raw()[self.query_start as usize..self.query_end as usize])
} else {
None
}
}
pub fn is_aln_start(&self) -> bool {
self.ref_pos == self.record.start() as u32
}
pub fn is_aln_end(&self) -> bool {
self.query_end == self.aln_query_end
}
pub fn cigar_index(&self) -> usize {
self.cigar_index
}
pub fn cigar_remaining(&self) -> u32 {
self.cigar_remaining
}
}
pub struct Pileup<'a, I: Iterator<Item = io::Result<Record>>> {
record_iter: &'a mut I,
read_filter: Box<dyn Fn(&Record) -> bool>,
entries: Vec<PileupEntry>,
error: Option<io::Error>,
last_ref_id: u32,
last_ref_pos: u32,
}
impl<'a, I: Iterator<Item = io::Result<Record>>> Pileup<'a, I> {
pub fn new(record_iter: &'a mut I) -> Self {
Self::with_filter(record_iter, |_| true)
}
pub fn with_filter<F: 'static + Fn(&Record) -> bool>(record_iter: &'a mut I, read_filter: F) -> Self {
let mut res = Pileup {
record_iter,
read_filter: Box::new(read_filter),
entries: Vec::new(),
error: None,
last_ref_id: 0,
last_ref_pos: 0,
};
res.read_next();
res
}
fn record_passes(&self, record: &Record) -> bool {
if !record.flag().is_mapped() {
return false;
}
assert!(record.ref_id() >= 0 && record.start() >= 0);
(self.read_filter)(record)
}
fn read_next(&mut self) {
if self.last_ref_id == std::u32::MAX || self.error.is_some() {
return;
}
loop {
match self.record_iter.next() {
None => self.last_ref_id = std::u32::MAX,
Some(Ok(record)) => {
if !self.record_passes(&record) {
continue;
}
let rec_ref_id = record.ref_id() as u32;
let rec_start = record.start() as u32;
if rec_ref_id < self.last_ref_id
|| (rec_ref_id == self.last_ref_id && rec_start < self.last_ref_pos) {
self.error = Some(io::Error::new(io::ErrorKind::InvalidData, "Input file is unsorted"));
self.last_ref_id = std::u32::MAX;
}
self.last_ref_id = rec_ref_id;
self.last_ref_pos = rec_start;
self.entries.push(PileupEntry::new(Rc::new(record)));
},
Some(Err(e)) => {
self.error = Some(e);
self.last_ref_id = std::u32::MAX;
},
}
return;
}
}
}
impl<'a, R: RecordReader> Iterator for Pileup<'a, R> {
type Item = io::Result<PileupColumn>;
fn next(&mut self) -> Option<Self::Item> {
if self.error.is_some() {
self.entries.clear();
self.last_ref_id = std::u32::MAX;
return Some(Err(self.error.take().unwrap()));
}
let mut new_ref_id = std::u32::MAX;
let mut new_ref_pos = std::u32::MAX;
while new_ref_id == std::u32::MAX && (!self.entries.is_empty() || self.last_ref_id < std::u32::MAX) {
for entry in self.entries.iter() {
let rec_ref_id = entry.record.ref_id() as u32;
if rec_ref_id < new_ref_id {
new_ref_id = rec_ref_id;
new_ref_pos = entry.ref_pos;
} else if rec_ref_id == new_ref_id {
new_ref_pos = min(new_ref_pos, entry.ref_pos);
}
}
while self.last_ref_id < std::u32::MAX
&& self.last_ref_id <= new_ref_id && self.last_ref_pos <= new_ref_pos {
self.read_next();
}
if self.error.is_some() {
self.entries.clear();
self.last_ref_id = std::u32::MAX;
return Some(Err(self.error.take().unwrap()));
}
}
let mut entries = Vec::new();
for i in (0..self.entries.len()).rev() {
let entry = &mut self.entries[i];
let rec_ref_id = entry.record.ref_id() as u32;
if rec_ref_id == new_ref_id && entry.ref_pos == new_ref_pos {
entries.push(entry.clone());
if !entry.move_forward() {
std::mem::drop(entry);
self.entries.swap_remove(i);
}
} else {
assert!(rec_ref_id > new_ref_id || entry.ref_pos > new_ref_pos,
"Record is to the left of the new pileup position");
}
}
if entries.is_empty() {
None
} else {
Some(Ok(PileupColumn {
entries,
ref_id: new_ref_id,
ref_pos: new_ref_pos,
}))
}
}
}
#[derive(Clone)]
pub struct PileupColumn {
entries: Vec<PileupEntry>,
ref_id: u32,
ref_pos: u32,
}
impl PileupColumn {
pub fn entries(&self) -> &[PileupEntry] {
&self.entries
}
pub fn sort(&mut self) {
self.entries.sort_by(|a, b| (a.record.start(), a.record.name()).cmp(&(b.record.start(), b.record.name())))
}
pub fn ref_id(&self) -> u32 {
self.ref_id
}
pub fn ref_pos(&self) -> u32 {
self.ref_pos
}
}