use std::cmp::Ordering;
use std::collections::{vec_deque, VecDeque};
use std::mem;
use crate::bcf::{self, Read};
use crate::errors::Result;
#[derive(Debug)]
pub struct RecordBuffer {
reader: bcf::Reader,
ringbuffer: VecDeque<bcf::Record>,
ringbuffer2: VecDeque<bcf::Record>,
overflow: Option<bcf::Record>,
}
unsafe impl Sync for RecordBuffer {}
unsafe impl Send for RecordBuffer {}
impl RecordBuffer {
pub fn new(reader: bcf::Reader) -> Self {
RecordBuffer {
reader,
ringbuffer: VecDeque::new(),
ringbuffer2: VecDeque::new(),
overflow: None,
}
}
fn last_rid(&self) -> Option<u32> {
self.ringbuffer.back().map(|rec| rec.rid().unwrap())
}
fn next_rid(&self) -> Option<u32> {
self.ringbuffer2.back().map(|rec| rec.rid().unwrap())
}
fn swap_buffers(&mut self) {
mem::swap(&mut self.ringbuffer2, &mut self.ringbuffer);
self.ringbuffer2.clear();
}
fn drain_left(&mut self, rid: u32, window_start: u64) -> usize {
let to_remove = self
.ringbuffer
.iter()
.take_while(|rec| (rec.pos() as u64) < window_start || rec.rid().unwrap() != rid)
.count();
self.ringbuffer.drain(..to_remove);
to_remove
}
pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
let rid = self.reader.header.name2rid(chrom)?;
let mut added = 0;
let mut deleted = 0;
match (self.last_rid(), self.next_rid()) {
(Some(last_rid), _) => {
if last_rid != rid {
deleted = self.ringbuffer.len();
self.swap_buffers();
added = self.ringbuffer.len();
} else {
deleted = self.drain_left(rid, start);
}
}
(_, Some(_)) => {
deleted = self.ringbuffer.len();
self.swap_buffers();
deleted += self.drain_left(rid, start);
added = self.ringbuffer.len();
}
_ => (),
}
if !self.ringbuffer2.is_empty() {
return Ok((added, deleted));
}
if self.overflow.is_some() {
let pos = self.overflow.as_ref().unwrap().pos() as u64;
if pos >= start {
if pos <= end {
self.ringbuffer.push_back(self.overflow.take().unwrap());
added += 1;
} else {
return Ok((added, deleted));
}
} else {
self.overflow.take();
}
}
loop {
let mut rec = self.reader.empty_record();
if self.reader.read(&mut rec).is_none() {
break;
}
let pos = rec.pos() as u64;
if let Some(rec_rid) = rec.rid() {
match rec_rid.cmp(&rid) {
Ordering::Equal => {
if pos >= end {
self.overflow = Some(rec);
break;
} else if pos >= start {
self.ringbuffer.push_back(rec);
added += 1;
} else {
continue;
}
}
Ordering::Greater => {
self.ringbuffer2.push_back(rec);
break;
}
_ => {
continue;
}
}
} else {
continue;
}
}
Ok((added, deleted))
}
pub fn iter(&self) -> vec_deque::Iter<'_, bcf::Record> {
self.ringbuffer.iter()
}
pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, bcf::Record> {
self.ringbuffer.iter_mut()
}
pub fn len(&self) -> usize {
self.ringbuffer.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bcf;
#[test]
fn test_buffer() {
let reader = bcf::Reader::from_path("test/test.bcf").unwrap();
let mut buffer = RecordBuffer::new(reader);
buffer.fetch(b"1", 100, 10023).unwrap();
{
let records: Vec<_> = buffer.iter().collect();
assert_eq!(records.len(), 2);
assert_eq!(records[0].pos(), 10021);
assert_eq!(records[1].pos(), 10022);
}
buffer.fetch(b"1", 10023, 10024).unwrap();
{
let records: Vec<_> = buffer.iter().collect();
assert_eq!(records.len(), 1);
assert_eq!(records[0].pos(), 10023);
}
}
}