use std::fs::File;
use std::io::{self, BufRead};
use std::path::Path;
use crate::errors::{ErrorPosition, ParseError};
use crate::parser::record::SequenceRecord;
use crate::parser::utils::{
fill_buf, find_line_ending, grow_to, trim_cr, FastxReader, Format, LineEnding, Position,
BUFSIZE,
};
use memchr::memchr;
#[derive(Debug, Clone, Default)]
pub struct BufferPosition {
pub(crate) start: usize,
pub(crate) end: usize,
pub(crate) seq: usize,
pub(crate) sep: usize,
pub(crate) qual: usize,
}
impl BufferPosition {
#[inline]
pub(crate) fn is_new(&self) -> bool {
self.end == 0
}
#[inline]
pub(crate) fn len(&self) -> u64 {
(self.end + 1 - self.start) as u64
}
#[inline]
pub(crate) fn id<'a>(&'a self, buffer: &'a [u8]) -> &'a [u8] {
trim_cr(&buffer[self.start + 1..self.seq - 1])
}
#[inline]
pub(crate) fn seq<'a>(&'a self, buffer: &'a [u8]) -> &'a [u8] {
trim_cr(&buffer[self.seq..self.sep - 1])
}
#[inline]
pub(crate) fn qual<'a>(&'a self, buffer: &'a [u8]) -> &'a [u8] {
trim_cr(&buffer[self.qual..self.end])
}
#[inline]
pub(crate) fn num_bases<'a>(&'a self, buffer: &'a [u8]) -> usize {
self.seq(buffer).len()
}
#[inline]
fn find_line_ending<'a>(&'a self, buffer: &'a [u8]) -> Option<LineEnding> {
find_line_ending(self.all(buffer))
}
#[inline]
pub(crate) fn all<'a>(&self, buffer: &'a [u8]) -> &'a [u8] {
&buffer[self.start..self.end]
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
enum SearchPosition {
Id,
Sequence,
Separator,
Quality,
}
pub struct Reader<R: io::Read> {
buf_reader: buffer_redux::BufReader<R>,
buf_pos: BufferPosition,
search_pos: SearchPosition,
position: Position,
finished: bool,
line_ending: Option<LineEnding>,
}
impl<R> Reader<R>
where
R: io::Read,
{
pub fn new(reader: R) -> Self {
Self::with_capacity(reader, BUFSIZE)
}
pub fn with_capacity(reader: R, capacity: usize) -> Self {
assert!(capacity >= 3);
Self {
buf_reader: buffer_redux::BufReader::with_capacity(capacity, reader),
buf_pos: BufferPosition::default(),
search_pos: SearchPosition::Id,
position: Position::new(1, 0),
finished: false,
line_ending: None,
}
}
}
impl Reader<File> {
pub fn from_path<P: AsRef<Path>>(path: P) -> io::Result<Self> {
File::open(path).map(Self::new)
}
}
impl<R> Reader<R>
where
R: io::Read,
{
#[inline]
fn get_buf(&self) -> &[u8] {
self.buf_reader.buffer()
}
fn find(&mut self) -> Result<bool, ParseError> {
self.buf_pos.seq = if let Some(p) = self.find_line(self.buf_pos.start) {
p
} else {
self.search_pos = SearchPosition::Id;
return Ok(false);
};
self.buf_pos.sep = if let Some(p) = self.find_line(self.buf_pos.seq) {
p
} else {
self.search_pos = SearchPosition::Sequence;
return Ok(false);
};
self.buf_pos.qual = if let Some(p) = self.find_line(self.buf_pos.sep) {
p
} else {
self.search_pos = SearchPosition::Separator;
return Ok(false);
};
self.buf_pos.end = if let Some(p) = self.find_line(self.buf_pos.qual) {
p - 1
} else {
self.search_pos = SearchPosition::Quality;
return Ok(false);
};
self.validate()?;
Ok(true)
}
fn find_incomplete(&mut self) -> Result<bool, ParseError> {
if self.search_pos == SearchPosition::Id {
self.buf_pos.seq = if let Some(p) = self.find_line(self.buf_pos.start) {
p
} else {
self.search_pos = SearchPosition::Id;
return Ok(false);
};
}
if self.search_pos <= SearchPosition::Sequence {
self.buf_pos.sep = if let Some(p) = self.find_line(self.buf_pos.seq) {
p
} else {
self.search_pos = SearchPosition::Sequence;
return Ok(false);
};
}
if self.search_pos <= SearchPosition::Separator {
self.buf_pos.qual = if let Some(p) = self.find_line(self.buf_pos.sep) {
p
} else {
self.search_pos = SearchPosition::Separator;
return Ok(false);
};
}
if self.search_pos <= SearchPosition::Quality {
self.buf_pos.end = if let Some(p) = self.find_line(self.buf_pos.qual) {
p - 1
} else {
self.search_pos = SearchPosition::Quality;
return Ok(false);
};
}
self.search_pos = SearchPosition::Id;
self.validate()?;
Ok(true)
}
fn validate(&mut self) -> Result<(), ParseError> {
let start_byte = self.get_buf()[self.buf_pos.start];
if start_byte != b'@' {
self.finished = true;
return Err(ParseError::new_invalid_start(
start_byte,
self.get_error_pos(0, false),
Format::Fastq,
));
}
let sep_byte = self.get_buf()[self.buf_pos.sep];
if sep_byte != b'+' {
self.finished = true;
return Err(ParseError::new_invalid_separator(
sep_byte,
self.get_error_pos(2, true),
));
}
let buf = self.get_buf();
let seq_len = self.buf_pos.seq(buf).len();
let qual_len = self.buf_pos.qual(buf).len();
if seq_len != qual_len {
self.finished = true;
return Err(ParseError::new_unequal_length(
seq_len,
qual_len,
self.get_error_pos(0, true),
));
}
Ok(())
}
fn get_error_pos(&self, line_offset: u64, parse_id: bool) -> ErrorPosition {
let id = if parse_id && self.buf_pos.seq - self.buf_pos.start > 1 {
let id = self
.buf_pos
.id(self.get_buf())
.split(|b| *b == b' ')
.next()
.unwrap();
Some(String::from_utf8_lossy(id).into())
} else {
None
};
ErrorPosition {
line: self.position.line() + line_offset,
id,
}
}
#[inline]
fn find_line(&self, search_start: usize) -> Option<usize> {
memchr(b'\n', &self.get_buf()[search_start..]).map(|pos| search_start + pos + 1)
}
fn next_complete(&mut self) -> Result<bool, ParseError> {
loop {
if self.get_buf().len() < self.buf_reader.capacity() {
return self.check_end();
}
if self.buf_pos.start == 0 {
self.grow();
} else {
self.make_room();
}
fill_buf(&mut self.buf_reader)?;
if self.find_incomplete()? {
return Ok(true);
}
}
}
fn check_end(&mut self) -> Result<bool, ParseError> {
self.finished = true;
if self.search_pos == SearchPosition::Quality {
self.buf_pos.end = self.get_buf().len();
self.validate()?;
return Ok(true);
}
let rest = &self.get_buf()[self.buf_pos.start..];
if rest.split(|c| *c == b'\n').all(|l| trim_cr(l).is_empty()) {
return Ok(false);
}
Err(ParseError::new_unexpected_end(
self.get_error_pos(self.search_pos as u64, self.search_pos > SearchPosition::Id),
Format::Fastq,
))
}
fn grow(&mut self) {
let cap = self.buf_reader.capacity();
let new_size = grow_to(cap);
let additional = new_size - cap;
self.buf_reader.reserve(additional);
}
fn make_room(&mut self) {
let consumed = self.buf_pos.start;
self.buf_reader.consume(consumed);
self.buf_reader.make_room();
self.buf_pos.start = 0;
if self.search_pos >= SearchPosition::Sequence {
self.buf_pos.seq -= consumed;
}
if self.search_pos >= SearchPosition::Separator {
self.buf_pos.sep -= consumed;
}
if self.search_pos >= SearchPosition::Quality {
self.buf_pos.qual -= consumed;
}
}
}
impl<R: io::Read + Send> FastxReader for Reader<R> {
fn next(&mut self) -> Option<Result<SequenceRecord<'_>, ParseError>> {
if self.finished {
return None;
}
if self.get_buf().is_empty() {
match fill_buf(&mut self.buf_reader) {
Ok(n) => {
if n == 0 {
self.finished = true;
return None;
}
}
Err(e) => {
return Some(Err(e.into()));
}
};
}
if !self.buf_pos.is_new() {
self.position.byte += self.buf_pos.len();
self.position.line += 4;
self.buf_pos.start = self.buf_pos.end + 1;
}
let complete = match self.find() {
Ok(f) => f,
Err(e) => {
return Some(Err(e));
}
};
if !complete {
let got_record = match self.next_complete() {
Ok(f) => f,
Err(e) => {
return Some(Err(e));
}
};
if !got_record {
return None;
}
}
if self.line_ending.is_none() {
self.line_ending = self.buf_pos.find_line_ending(self.get_buf());
}
Some(Ok(SequenceRecord::new_fastq(
self.get_buf(),
&self.buf_pos,
&self.position,
self.line_ending,
)))
}
fn position(&self) -> &Position {
&self.position
}
fn line_ending(&self) -> Option<LineEnding> {
self.line_ending
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use super::Reader;
use crate::errors::ParseErrorKind;
use crate::parser::utils::LineEnding;
use crate::FastxReader;
fn seq(s: &[u8]) -> Cursor<&[u8]> {
Cursor::new(s)
}
#[test]
fn test_simple_fastq() {
let sequences = vec![
(
"@test\nAGCT\n+test\n~~a!\n@test2\nTGCA\n+test\nWUI9",
LineEnding::Unix,
),
(
"@test\r\nAGCT\r\n+test\r\n~~a!\r\n@test2\r\nTGCA\r\n+test\r\nWUI9",
LineEnding::Windows,
),
];
for (sequence, line_ending) in sequences {
let mut i = 0;
let mut reader = Reader::new(seq(sequence.as_bytes()));
while let Some(record) = reader.next() {
let rec = record.unwrap();
match i {
0 => {
assert_eq!(&rec.id(), b"test");
assert_eq!(&rec.raw_seq(), b"AGCT");
assert_eq!(&rec.qual().unwrap(), b"~~a!");
assert_eq!(reader.line_ending().unwrap(), line_ending);
}
1 => {
assert_eq!(&rec.id(), b"test2");
assert_eq!(&rec.raw_seq(), b"TGCA");
assert_eq!(&rec.qual().unwrap(), b"WUI9");
assert_eq!(reader.line_ending().unwrap(), line_ending);
}
_ => unreachable!("Too many records"),
}
i += 1;
}
assert_eq!(i, 2);
}
}
#[test]
fn test_eof_in_qual() {
let mut reader = Reader::new(seq(b"@test\nACGT\n+\nIII"));
let rec = reader.next().unwrap();
assert!(rec.is_err());
let e = rec.unwrap_err();
assert_eq!(e.kind, ParseErrorKind::UnequalLengths);
}
#[test]
fn test_eof_in_seq() {
let mut reader = Reader::new(seq(b"@test\nAGCT\n+test\n~~a!\n@test2\nTGCA"));
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let rec2 = reader.next().unwrap();
assert!(rec2.is_err());
let e = rec2.unwrap_err();
assert_eq!(e.kind, ParseErrorKind::UnexpectedEnd);
}
#[test]
fn test_extra_empty_newlines_at_end_are_ok() {
let mut reader = Reader::new(seq(b"@test\nAGCT\n+test\n~~a!\n\n"));
let rec = reader.next().unwrap();
assert!(rec.is_ok());
assert!(reader.next().is_none());
}
#[test]
fn test_extra_non_empty_newlines_at_end_are_not_ok() {
let mut reader = Reader::new(seq(b"@test\nAGCT\n+test\n~~a!\n\n@TEST\nA\n+TEST\n~"));
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let rec2 = reader.next().unwrap();
let e = rec2.unwrap_err();
assert_eq!(e.kind, ParseErrorKind::InvalidStart);
}
#[test]
fn test_empty_records() {
let mut reader = Reader::new(seq(b"@\n\n+\n\n@test2\nTGCA\n+test2\n~~~~\n"));
let mut i = 0;
while let Some(record) = reader.next() {
let rec = record.unwrap();
match i {
0 => {
assert_eq!(&rec.id(), b"");
assert_eq!(&rec.raw_seq(), b"");
assert_eq!(&rec.qual().unwrap(), b"");
assert_eq!(rec.all(), b"@\n\n+\n");
}
1 => {
assert_eq!(&rec.id(), b"test2");
assert_eq!(&rec.raw_seq(), b"TGCA");
assert_eq!(&rec.qual().unwrap(), b"~~~~");
assert_eq!(rec.all(), b"@test2\nTGCA\n+test2\n~~~~");
}
_ => unreachable!("Too many records"),
}
i += 1;
}
assert_eq!(i, 2);
}
#[test]
fn test_weird_ncbi_file() {
let test = b"@NCBI actually has files like this\nACGTACGATCGTACGTAGCTGCTAGCTAGCATGCATGACACACACGTACGATCGTACGTAGCTGCTAGCTAGCATGCATGACACAC\n+\n00000000000000000000000000000000000000000000000000000000000000000000000000000000000000\n@NCBI actually has files like this\n\n+\n\n@NCBI actually has files like this\nACGTACGATCGTACGTAGCTGCTAGCTAGCATGCATGACACACACGTACGATCGTACGTAGCTGCTAGCTAGCATGCATGACACAC\n+\n00000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
let mut reader = Reader::new(seq(test));
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let r = rec.unwrap();
assert_eq!(r.start_line_number(), 1);
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let r = rec.unwrap();
assert_eq!(r.start_line_number(), 5);
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let r = rec.unwrap();
assert_eq!(r.start_line_number(), 9);
}
#[test]
fn test_mismatched_lengths() {
let mut reader = Reader::new(seq(b"@test\nAGCT\n+\nIII\n@TEST\nA\n+\nI"));
let rec = reader.next().unwrap();
assert!(rec.is_err());
let e = rec.unwrap_err();
assert_eq!(e.kind, ParseErrorKind::UnequalLengths);
}
#[test]
fn test_bad_headers() {
let mut reader = Reader::from_path("tests/data/bad_header.fastq").unwrap();
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let rec2 = reader.next().unwrap();
let e = rec2.unwrap_err();
assert_eq!(e.kind, ParseErrorKind::UnexpectedEnd);
}
#[test]
fn test_fastq_with_random_tsv_inside() {
let mut reader = Reader::from_path("tests/data/random_tsv.fq").unwrap();
let rec = reader.next().unwrap();
assert!(rec.is_ok());
let rec2 = reader.next().unwrap();
let e = rec2.unwrap_err();
assert_eq!(e.kind, ParseErrorKind::InvalidSeparator);
}
}