use std::borrow::Cow;
use std::fs::File;
use std::io::{self, BufRead, Seek};
use std::iter;
use std::path::Path;
use std::slice;
use std::str::{self, Utf8Error};
use buffer_redux;
use memchr::Memchr;
use super::policy::{BufPolicy, StdPolicy};
use super::*;
type DefaultPolicy = StdPolicy;
const BUFSIZE: usize = 64 * 1024;
pub struct Reader<R: io::Read, P = DefaultPolicy> {
buf_reader: buffer_redux::BufReader<R>,
buf_pos: BufferPosition,
position: Position,
search_pos: usize,
finished: bool,
buf_policy: P,
}
impl<R> Reader<R, DefaultPolicy>
where
R: io::Read,
{
#[inline]
pub fn new(reader: R) -> Reader<R, StdPolicy> {
Reader::with_capacity(reader, BUFSIZE)
}
#[inline]
pub fn with_capacity(reader: R, capacity: usize) -> Reader<R, DefaultPolicy> {
assert!(capacity >= 3);
Reader {
buf_reader: buffer_redux::BufReader::with_capacity(capacity, reader),
buf_pos: BufferPosition {
start: 0,
seq_pos: Vec::with_capacity(1),
},
position: Position::new(0, 0),
search_pos: 0,
finished: false,
buf_policy: StdPolicy,
}
}
}
impl Reader<File, DefaultPolicy> {
#[inline]
pub fn from_path<P: AsRef<Path>>(path: P) -> io::Result<Reader<File>> {
File::open(path).map(Reader::new)
}
}
impl<R, P> Reader<R, P>
where
R: io::Read,
P: BufPolicy,
{
#[inline]
pub fn set_policy<T: BufPolicy>(self, policy: T) -> Reader<R, T> {
Reader {
buf_reader: self.buf_reader,
buf_pos: self.buf_pos,
position: self.position,
search_pos: self.search_pos,
finished: self.finished,
buf_policy: policy,
}
}
#[inline]
pub fn policy(&self) -> &P {
&self.buf_policy
}
#[allow(clippy::should_implement_trait)]
#[inline]
pub fn next(&mut self) -> Option<Result<RefRecord, Error>> {
if self.finished || !self.initialized() && !try_opt!(self.init()) {
return None;
}
if !self.buf_pos.is_new() {
self.next_pos();
}
if !try_opt!(self.search()) && !try_opt!(self.search_complete()) {
return None;
}
Some(Ok(RefRecord {
buffer: self.get_buf(),
buf_pos: &self.buf_pos,
}))
}
#[inline]
pub fn read_record_set(&mut self, rset: &mut RecordSet) -> Option<Result<(), Error>> {
if self.finished {
return None;
}
if !self.initialized() {
if !try_opt!(self.init()) {
return None;
}
if !try_opt!(self.search()) {
return Some(Ok(()));
}
} else if !try_opt!(self.search_complete()) {
return None;
};
rset.buffer.clear();
rset.buffer.extend(self.get_buf());
let mut n = 0;
for pos in &mut rset.positions {
n += 1;
pos.update(&self.buf_pos);
self.next_pos();
if self.finished || !try_opt!(self.search()) {
rset.npos = n;
return Some(Ok(()));
}
}
loop {
n += 1;
rset.positions.push(self.buf_pos.clone());
self.next_pos();
if self.finished || !try_opt!(self.search()) {
rset.npos = n;
return Some(Ok(()));
}
}
}
fn next_pos(&mut self) {
self.position.line += self.buf_pos.seq_pos.len() as u64;
self.position.byte += (self.search_pos - self.buf_pos.start) as u64;
self.buf_pos.start = self.search_pos;
self.buf_pos.seq_pos.clear();
}
fn get_buf(&self) -> &[u8] {
self.buf_reader.buffer()
}
fn initialized(&self) -> bool {
self.position.line != 0
}
#[inline(never)]
fn init(&mut self) -> Result<bool, Error> {
if let Some((line_num, pos, byte)) = self.first_byte()? {
if byte == b'>' {
self.buf_pos.start = pos;
self.position.byte = pos as u64;
self.position.line = line_num as u64;
self.search_pos = pos + 1;
return Ok(true);
} else {
self.finished = true;
return Err(Error::InvalidStart {
line: line_num,
found: byte,
});
}
}
self.finished = true;
Ok(false)
}
fn first_byte(&mut self) -> Result<Option<(usize, usize, u8)>, Error> {
let mut line_num = 0;
while fill_buf(&mut self.buf_reader)? > 0 {
let mut pos = 0;
let mut last_line_len = 0;
for line in self.get_buf().split(|b| *b == b'\n') {
line_num += 1;
if !line.is_empty() && line != b"\r" {
return Ok(Some((line_num, pos, line[0])));
}
pos += line.len() + 1;
last_line_len = line.len();
}
self.buf_reader.consume(pos - 1 - last_line_len);
self.buf_reader.make_room();
}
Ok(None)
}
fn search(&mut self, incomplete_pos: Option<usize>) -> Result<bool, Error> {
if self._search(incomplete_pos.unwrap_or(self.buf_pos.start)) {
return Ok(true);
}
if self.get_buf().len() < self.buf_reader.capacity() {
self.finished = true;
self.buf_pos.seq_pos.push(self.search_pos);
return Ok(true);
}
Ok(false)
}
fn _search(&mut self, buf_offset: usize) -> bool {
let bufsize = self.get_buf().len();
for pos in Memchr::new(b'\n', &self.buf_reader.buffer()[buf_offset..]) {
let pos = buf_offset + pos;
let next_line_start = pos + 1;
if next_line_start == bufsize {
self.incomplete_pos = Some(pos); return false;
}
self.buf_pos.seq_pos.push(pos);
if self.get_buf()[next_line_start] == b'>' {
self.incomplete_pos = None;
return true;
}
}
self.incomplete_pos = Some(bufsize);
false
}
fn search_complete(&mut self) -> Result<bool, Error> {
loop {
if self.buf_pos.start == 0 {
self.grow()?;
} else {
self.make_room();
}
fill_buf(&mut self.buf_reader)?;
if self.search()? {
return Ok(true);
}
}
}
fn grow(&mut self) -> Result<(), Error> {
let cap = self.buf_reader.capacity();
let new_size = self.buf_policy.grow_to(cap).ok_or(Error::BufferLimit)?;
let additional = new_size - cap;
self.buf_reader.reserve(additional);
Ok(())
}
fn make_room(&mut self) {
let consumed = self.buf_pos.start;
self.buf_reader.consume(consumed);
self.buf_reader.make_room();
self.buf_pos.start = 0;
self.search_pos -= consumed;
for s in &mut self.buf_pos.seq_pos {
*s -= consumed;
}
}
#[inline]
pub fn position(&self) -> Option<&Position> {
if self.buf_pos.is_new() {
return None;
}
Some(&self.position)
}
#[inline]
pub fn records(&mut self) -> RecordsIter<R, P> {
RecordsIter { rdr: self }
}
#[inline]
pub fn into_records(self) -> RecordsIntoIter<R, P> {
RecordsIntoIter { rdr: self }
}
}
impl<R, P> Reader<R, P>
where
R: io::Read + Seek,
P: BufPolicy,
{
#[inline]
pub fn seek(&mut self, pos: &Position) -> Result<(), Error> {
self.finished = false;
let diff = pos.byte as i64 - self.position.byte as i64;
let rel_pos = self.buf_pos.start as i64 + diff;
if rel_pos >= 0 && rel_pos < (self.get_buf().len() as i64) {
self.search_pos = rel_pos as usize;
self.buf_pos.reset(rel_pos as usize);
self.position = pos.clone();
return Ok(());
}
self.position = pos.clone();
self.search_pos = 0;
self.buf_reader.seek(io::SeekFrom::Start(pos.byte))?;
fill_buf(&mut self.buf_reader)?;
self.buf_pos.reset(0);
Ok(())
}
}
pub struct RecordsIter<'a, R, P = DefaultPolicy>
where
P: 'a,
R: io::Read + 'a,
{
rdr: &'a mut Reader<R, P>,
}
impl<'a, R, P> Iterator for RecordsIter<'a, R, P>
where
P: BufPolicy + 'a,
R: io::Read + 'a,
{
type Item = Result<OwnedRecord, Error>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.rdr.next().map(|rec| rec.map(|r| r.to_owned_record()))
}
}
pub struct RecordsIntoIter<R: io::Read, P = DefaultPolicy> {
rdr: Reader<R, P>,
}
impl<R, P> Iterator for RecordsIntoIter<R, P>
where
P: BufPolicy,
R: io::Read,
{
type Item = Result<OwnedRecord, Error>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.rdr.next().map(|rec| rec.map(|r| r.to_owned_record()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position {
line: u64,
byte: u64,
}
impl Position {
#[inline]
pub fn new(line: u64, byte: u64) -> Position {
Position { line, byte }
}
#[inline]
pub fn line(&self) -> u64 {
self.line
}
#[inline]
pub fn byte(&self) -> u64 {
self.byte
}
}
#[derive(Debug)]
pub enum Error {
Io(io::Error),
InvalidStart {
line: usize,
found: u8,
},
BufferLimit,
}
impl fmt::Display for Error {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Io(ref e) => e.fmt(f),
Error::InvalidStart { line, found } => write!(
f,
"FASTA parse error: expected '>' but found '{}' at file start, line {}.",
(found as char).escape_default(),
line
),
Error::BufferLimit => write!(f, "FASTA parse error: buffer limit reached."),
}
}
}
impl From<io::Error> for Error {
#[inline]
fn from(e: io::Error) -> Error {
Error::Io(e)
}
}
impl error::Error for Error {
#[inline]
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
Error::Io(ref err) => Some(err),
_ => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BufferPosition {
start: usize,
seq_pos: Vec<usize>,
}
impl BufferPosition {
#[inline]
fn is_new(&self) -> bool {
self.seq_pos.is_empty()
}
#[inline]
fn reset(&mut self, start: usize) {
self.seq_pos.clear();
self.start = start;
}
#[inline]
fn update(&mut self, other: &Self) {
self.start = other.start;
self.seq_pos.clear();
self.seq_pos.extend(&other.seq_pos);
}
}
pub trait Record {
fn head(&self) -> &[u8];
fn seq(&self) -> &[u8];
fn write<W: io::Write>(&self, writer: W) -> io::Result<()>;
fn write_wrap<W: io::Write>(&self, writer: W, wrap: usize) -> io::Result<()>;
#[inline]
fn id_bytes(&self) -> &[u8] {
self.head().split(|b| *b == b' ').next().unwrap()
}
#[inline]
fn id(&self) -> Result<&str, Utf8Error> {
str::from_utf8(self.id_bytes())
}
#[inline]
fn desc_bytes(&self) -> Option<&[u8]> {
self.head().splitn(2, |b| *b == b' ').nth(1)
}
#[inline]
fn desc(&self) -> Option<Result<&str, Utf8Error>> {
self.desc_bytes().map(str::from_utf8)
}
#[inline]
fn id_desc_bytes(&self) -> (&[u8], Option<&[u8]>) {
let mut h = self.head().splitn(2, |c| *c == b' ');
(h.next().unwrap(), h.next())
}
#[inline]
fn id_desc(&self) -> Result<(&str, Option<&str>), Utf8Error> {
let mut h = str::from_utf8(self.head())?.splitn(2, ' ');
Ok((h.next().unwrap(), h.next()))
}
}
#[derive(Debug, Clone)]
pub struct RefRecord<'a> {
buffer: &'a [u8],
buf_pos: &'a BufferPosition,
}
impl<'a> Record for RefRecord<'a> {
#[inline]
fn head(&self) -> &[u8] {
trim_cr(&self.buffer[self.buf_pos.start + 1..*self.buf_pos.seq_pos.first().unwrap()])
}
#[inline]
fn seq(&self) -> &[u8] {
if self.buf_pos.seq_pos.len() > 1 {
let start = *self.buf_pos.seq_pos.first().unwrap() + 1;
let end = *self.buf_pos.seq_pos.last().unwrap();
trim_cr(&self.buffer[start..end])
} else {
b""
}
}
#[inline]
fn write<W: io::Write>(&self, mut writer: W) -> io::Result<()> {
write_head(&mut writer, self.head())?;
write_seq_iter(&mut writer, self.seq_lines())
}
#[inline]
fn write_wrap<W: io::Write>(&self, mut writer: W, wrap: usize) -> io::Result<()> {
write_head(&mut writer, self.head())?;
write_wrap_seq_iter(&mut writer, self.seq_lines(), wrap)
}
}
impl<'a> RefRecord<'a> {
#[inline]
pub fn seq_lines(&self) -> SeqLines {
SeqLines {
data: self.buffer,
len: self.buf_pos.seq_pos.len() - 1,
pos_iter: self
.buf_pos
.seq_pos
.iter()
.zip(self.buf_pos.seq_pos.iter().skip(1)),
}
}
#[inline]
pub fn num_seq_lines(&self) -> usize {
self.seq_lines().len()
}
#[inline]
pub fn full_seq(&self) -> Cow<[u8]> {
if self.num_seq_lines() == 1 {
self.seq().into()
} else {
self.owned_seq().into()
}
}
#[inline]
pub fn owned_seq(&self) -> Vec<u8> {
let mut seq = Vec::new();
for segment in self.seq_lines() {
seq.extend(segment);
}
seq
}
#[inline]
pub fn to_owned_record(&self) -> OwnedRecord {
OwnedRecord {
head: self.head().to_vec(),
seq: self.owned_seq(),
}
}
#[inline]
pub fn write_unchanged<W: io::Write>(&self, mut writer: W) -> io::Result<()> {
let data = &self.buffer[self.buf_pos.start..*self.buf_pos.seq_pos.last().unwrap()];
writer.write_all(data)?;
if *data.last().unwrap() != b'\n' {
writer.write_all(&[b'\n'])?;
}
Ok(())
}
}
pub struct SeqLines<'a> {
data: &'a [u8],
len: usize,
pos_iter: iter::Zip<slice::Iter<'a, usize>, iter::Skip<slice::Iter<'a, usize>>>,
}
impl<'a> Iterator for SeqLines<'a> {
type Item = &'a [u8];
#[inline]
fn next(&mut self) -> Option<&'a [u8]> {
self.pos_iter
.next()
.map(|(start, next_start)| trim_cr(&self.data[*start + 1..*next_start]))
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let l = self.len();
(l, Some(l))
}
}
impl<'a> DoubleEndedIterator for SeqLines<'a> {
#[inline]
fn next_back(&mut self) -> Option<&'a [u8]> {
self.pos_iter
.next_back()
.map(|(start, next_start)| trim_cr(&self.data[*start + 1..*next_start]))
}
}
impl<'a> ExactSizeIterator for SeqLines<'a> {
#[inline]
fn len(&self) -> usize {
self.len
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OwnedRecord {
pub head: Vec<u8>,
pub seq: Vec<u8>,
}
impl Record for OwnedRecord {
#[inline]
fn head(&self) -> &[u8] {
&self.head
}
#[inline]
fn seq(&self) -> &[u8] {
&self.seq
}
#[inline]
fn write<W: io::Write>(&self, writer: W) -> io::Result<()> {
write_to(writer, &self.head, &self.seq)
}
#[inline]
fn write_wrap<W: io::Write>(&self, mut writer: W, wrap: usize) -> io::Result<()> {
write_head(&mut writer, &self.head)?;
write_wrap_seq(&mut writer, &self.seq, wrap)
}
}
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct RecordSet {
buffer: Vec<u8>,
positions: Vec<BufferPosition>,
npos: usize,
}
impl<'a> iter::IntoIterator for &'a RecordSet {
type Item = RefRecord<'a>;
type IntoIter = RecordSetIter<'a>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
RecordSetIter {
buffer: &self.buffer,
pos: self.positions.iter().take(self.npos),
}
}
}
pub struct RecordSetIter<'a> {
buffer: &'a [u8],
pos: iter::Take<slice::Iter<'a, BufferPosition>>,
}
impl<'a> Iterator for RecordSetIter<'a> {
type Item = RefRecord<'a>;
#[inline]
fn next(&mut self) -> Option<RefRecord<'a>> {
self.pos.next().map(|p| RefRecord {
buffer: self.buffer,
buf_pos: p,
})
}
}
#[inline]
pub fn write_to<W>(mut writer: W, head: &[u8], seq: &[u8]) -> io::Result<()>
where
W: io::Write,
{
write_head(&mut writer, head)?;
write_seq(&mut writer, seq)
}
#[inline]
pub fn write_parts<W>(mut writer: W, id: &[u8], desc: Option<&[u8]>, seq: &[u8]) -> io::Result<()>
where
W: io::Write,
{
write_id_desc(&mut writer, id, desc)?;
write_seq(&mut writer, seq)
}
#[inline]
pub fn write_wrap<W>(
mut writer: W,
id: &[u8],
desc: Option<&[u8]>,
seq: &[u8],
wrap: usize,
) -> io::Result<()>
where
W: io::Write,
{
write_id_desc(&mut writer, id, desc)?;
write_wrap_seq(&mut writer, seq, wrap)
}
#[inline]
pub fn write_head<W>(mut writer: W, head: &[u8]) -> io::Result<()>
where
W: io::Write,
{
writer.write_all(b">")?;
writer.write_all(head)?;
writer.write_all(b"\n")
}
#[inline]
pub fn write_id_desc<W>(mut writer: W, id: &[u8], desc: Option<&[u8]>) -> io::Result<()>
where
W: io::Write,
{
writer.write_all(b">")?;
writer.write_all(id)?;
if let Some(d) = desc {
writer.write_all(b" ")?;
writer.write_all(d)?;
}
writer.write_all(b"\n")
}
#[inline]
pub fn write_seq<W>(mut writer: W, seq: &[u8]) -> io::Result<()>
where
W: io::Write,
{
writer.write_all(seq)?;
writer.write_all(b"\n")
}
#[inline]
pub fn write_wrap_seq<W>(mut writer: W, seq: &[u8], wrap: usize) -> io::Result<()>
where
W: io::Write,
{
assert!(wrap > 0);
for chunk in seq.chunks(wrap) {
writer.write_all(chunk)?;
writer.write_all(b"\n")?;
}
Ok(())
}
#[inline]
pub fn write_seq_iter<'a, W, P>(mut writer: W, seq: P) -> io::Result<()>
where
W: io::Write,
P: Iterator<Item = &'a [u8]>,
{
for subseq in seq {
writer.write_all(subseq)?;
}
writer.write_all(b"\n")
}
#[inline]
pub fn write_wrap_seq_iter<'a, W, P>(mut writer: W, seq: P, wrap: usize) -> io::Result<()>
where
W: io::Write,
P: IntoIterator<Item = &'a [u8]>,
{
assert!(wrap > 0);
let mut n_line = 0;
for subseq in seq {
let mut chunk = subseq;
loop {
let remaining = wrap - n_line;
if chunk.len() <= remaining {
writer.write_all(chunk)?;
n_line += chunk.len();
break;
}
let (line, rest) = chunk.split_at(remaining);
chunk = rest;
writer.write_all(line)?;
writer.write_all(b"\n")?;
n_line = 0;
}
}
writer.write_all(b"\n")?;
Ok(())
}