use std::io::{Result, Read, Error, ErrorKind, Cursor};
use std::thread;
use std::sync::mpsc::{SyncSender, sync_channel};
use std::sync::Arc;
use std::iter::FromIterator;
use std::path::Path;
use lz4::Decoder;
use flate2::read::MultiGzDecoder;
extern crate memchr;
extern crate lz4;
extern crate flate2;
mod thread_reader;
mod buffer;
mod records;
pub use thread_reader::thread_reader;
pub use records::{RefRecord, Record, OwnedRecord};
use records::{IdxRecord, IdxRecordResult};
#[cfg(fuzzing)]
const BUFSIZE: usize = 64;
#[cfg(not(fuzzing))]
const BUFSIZE: usize = 68 * 1024;
pub struct Parser<R: Read> {
reader: R,
buffer: buffer::Buffer,
}
pub fn parse_path<P, F, O>(path: Option<P>, func: F) -> Result<O>
where P: AsRef<Path>,
F: FnOnce(Parser<&mut Read>) -> O
{
let mut reader: Box<Read + Send> = match path {
None => {
Box::new(std::io::stdin())
},
Some(path) => {
Box::new(std::fs::File::open(path)?)
}
};
let mut magic_bytes = [0u8; 4];
reader.read_exact(&mut magic_bytes)?;
let mut reader = Cursor::new(magic_bytes.to_vec()).chain(reader);
if unsafe { std::mem::transmute::<_, u32>(magic_bytes.clone()) }.to_le() == 0x184D2204 {
let bufsize = 1<<22;
let queuelen = 2;
return Ok(thread_reader(bufsize, queuelen, Decoder::new(reader)?, |mut reader| {
func(Parser::new(&mut reader))
}).expect("lz4 reader thread paniced"))
} else if &magic_bytes[..2] == b"\x1f\x8b" {
let bufsize = 1<<22;
let queuelen = 2;
let reader = MultiGzDecoder::new(reader);
return Ok(thread_reader(bufsize, queuelen, reader, |mut reader| {
func(Parser::new(&mut reader))
}).expect("gzip reader thread paniced"))
} else if magic_bytes[0] == b'@' {
Ok(func(Parser::new(&mut reader)))
} else {
return Err(Error::new(ErrorKind::InvalidData, "Not a gzip, lz4 or plain fastq file"))
}
}
impl<'a, R: 'a + Read> Parser<R> {
pub fn new(reader: R) -> Parser<R> {
Parser {
reader: reader,
buffer: buffer::Buffer::new(BUFSIZE),
}
}
pub fn ref_iter(self) -> RecordRefIter<R> {
RecordRefIter {
parser: self,
current_length: None,
current: None,
}
}
#[inline]
pub fn each<F>(self, mut func: F) -> Result<bool> where F: FnMut(RefRecord) -> bool {
let mut iter = self.ref_iter();
loop {
iter.advance()?;
match iter.get() {
None => { return Ok(true) },
Some(record) => {
let go_on = func(record);
if !go_on {
return Ok(false)
}
}
}
}
}
}
pub struct RecordRefIter<R: Read> {
parser: Parser<R>,
current: Option<IdxRecord>,
current_length: Option<usize>,
}
impl<R: Read> RecordRefIter<R> {
pub fn get(&self) -> Option<RefRecord> {
match self.current {
None => None,
Some(ref rec) => Some(rec.to_ref_record(self.parser.buffer.data()))
}
}
pub fn advance(&mut self) -> Result<()> {
let mut buffer = &mut self.parser.buffer;
let mut reader = &mut self.parser.reader;
if let Some(len) = self.current_length.take() {
buffer.consume(len);
}
loop {
match IdxRecord::from_buffer(buffer.data()) {
Err(e) => { return Err(e) },
Ok(IdxRecordResult::EmptyBuffer) => {
buffer.clean();
match buffer.read_into(reader) {
Err(e) => { return Err(e) },
Ok(0) => {
self.current = None;
self.current_length = None;
return Ok(())
},
_ => { continue }
}
},
Ok(IdxRecordResult::Incomplete) => {
buffer.clean();
if buffer.n_free() == 0 {
return Err(Error::new(ErrorKind::InvalidData,
"Fastq record is too long"));
}
match buffer.read_into(reader) {
Err(e) => { return Err(e) }
Ok(0) => {
return Err(Error::new(ErrorKind::InvalidData,
"Possibly truncated input file"));
},
_ => { continue }
}
},
Ok(IdxRecordResult::Record(record)) => {
let length = record.data.1.checked_sub(record.data.0).unwrap();
self.current = Some(record);
self.current_length = Some(length);
return Ok(());
}
}
}
}
}
#[derive(Debug)]
pub struct RecordSet {
buffer: Box<[u8]>,
records: Vec<IdxRecord>,
}
impl RecordSet {
fn from_records(buffer: Box<[u8]>, records: Vec<IdxRecord>) -> RecordSet {
RecordSet {
buffer: buffer,
records: records,
}
}
pub fn iter<'a>(&'a self) -> RecordSetItems<'a> {
RecordSetItems { idx_records: self.records.iter(), buffer: &self.buffer }
}
pub fn len(&self) -> usize {
self.records.len()
}
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
}
pub struct RecordSetItems<'a> {
idx_records: ::std::slice::Iter<'a, IdxRecord>,
buffer: &'a [u8],
}
impl<'a> Iterator for RecordSetItems<'a> {
type Item = RefRecord<'a>;
#[inline]
fn next(&mut self) -> Option<RefRecord<'a>> {
match self.idx_records.next() {
Some(idx_record) => {
Some(idx_record.to_ref_record(self.buffer))
},
None => None
}
}
}
struct RecordSetIter<R: Read> {
parser: Parser<R>,
num_records_guess: usize,
reader_at_end: bool,
}
impl<R: Read> Iterator for RecordSetIter<R> {
type Item = Result<RecordSet>;
fn next(&mut self) -> Option<Result<RecordSet>> {
if self.reader_at_end {
return None
}
let mut records: Vec<IdxRecord> = Vec::with_capacity(self.num_records_guess);
loop {
let parse_result = {
match IdxRecord::from_buffer(self.parser.buffer.data()) {
Ok(val) => { val },
Err(e) => { return Some(Err(e)) }
}
};
let buffer_pos = self.parser.buffer.pos();
use IdxRecordResult::*;
match parse_result {
EmptyBuffer => {
self.num_records_guess = records.len() + 1;
let buffer = vec![0u8; BUFSIZE].into_boxed_slice();
let buffer = self.parser.buffer.replace_buffer(buffer);
match self.parser.buffer.read_into(&mut self.parser.reader) {
Err(e) => { return Some(Err(e)) },
Ok(0) => { self.reader_at_end = true },
_ => { }
}
return Some(Ok(RecordSet::from_records(buffer, records)))
}
Incomplete => {
self.num_records_guess = records.len() + 1;
let buffer = vec![0u8; BUFSIZE].into_boxed_slice();
let buffer = self.parser.buffer.replace_buffer(buffer);
if self.parser.buffer.n_free() == 0 {
return Some(Err(Error::new(ErrorKind::InvalidData,
"Fastq record is too long.")))
}
match self.parser.buffer.read_into(&mut self.parser.reader) {
Err(e) => { return Some(Err(e)) }
Ok(0) => { return Some(Err(Error::new(ErrorKind::InvalidData,
"Truncated input file."))) }
_ => { }
}
return Some(Ok(RecordSet::from_records(buffer, records)))
}
Record(mut record) => {
record.data.0 += buffer_pos;
record.data.1 += buffer_pos;
let (start, end) = (record.data.0, record.data.1);
records.push(record);
self.parser.buffer.consume(end - start);
}
}
}
}
}
impl<R: Read> Parser<R> {
fn record_sets(self) -> RecordSetIter<R> {
RecordSetIter {
parser: self,
reader_at_end: false,
num_records_guess: 100,
}
}
pub fn parallel_each<O, S, F>(self, n_threads: usize, func: F) -> Result<S>
where
S: FromIterator<O>,
O: Send + 'static,
F: Send + Sync + 'static,
F: Fn(Box<Iterator<Item=RecordSet>>) -> O,
{
let mut senders: Vec<SyncSender<_>> = vec![];
let mut threads: Vec<thread::JoinHandle<_>> = vec![];
let func = Arc::new(func);
for i in 0..n_threads {
let (tx, rx): (SyncSender<RecordSet>, _) = sync_channel(10);
let func = func.clone();
let thread = thread::Builder::new()
.name(format!("worker-{}", i))
.spawn(move || {
func(Box::new(rx.into_iter()))
})
.expect("Could not start worker threads");
senders.push(tx);
threads.push(thread);
}
let mut io_error = None;
for (record_set, sender) in self.record_sets().zip(senders.iter().cycle()) {
match record_set {
Ok(records) => {
if let Err(_) = sender.send(records) {
break;
}
},
Err(e) => {
io_error = Some(e);
break;
}
}
}
::std::mem::drop(senders);
let results = threads.into_iter()
.map(|thread| thread.join())
.collect::<Vec<_>>()
.into_iter()
.map(|res| res.expect("Panic in worker thread"))
.collect();
match io_error {
Some(e) => { Err(e) },
None => { Ok(results) }
}
}
}
pub fn each_zipped<R1, R2, F>(parser1: Parser<R1>, parser2: Parser<R2>, mut callback: F)
-> Result<(bool, bool)>
where
R1: Read,
R2: Read,
F: FnMut(Option<RefRecord>, Option<RefRecord>) -> (bool, bool)
{
let mut iter1 = parser1.ref_iter();
let mut iter2 = parser2.ref_iter();
let mut finished = (false, false);
iter1.advance()?;
iter2.advance()?;
loop {
let advance_flags = {
let val1 = if finished.0 { None } else { iter1.get() };
let val2 = if finished.1 { None } else { iter2.get() };
finished = (val1.is_none(), val2.is_none());
callback(val1, val2)
};
if advance_flags == (false, false) || finished == (true, true) {
return Ok(finished);
}
if advance_flags.0 && !finished.0 {
iter1.advance()?;
}
if advance_flags.1 && !finished.1 {
iter2.advance()?;
}
}
}
#[cfg(test)]
mod tests {
use std::io::{Cursor, Write, Seek, SeekFrom, ErrorKind};
use super::{Parser, Record};
#[test]
fn correct() {
let data = Cursor::new(b"@hi\nNN\n+\n++\n@hallo\nTCC\n+\nabc\n");
let parser = Parser::new(data);
let mut i: u64 = 0;
let ok = parser.each(move |record| {
if i == 0 {
assert_eq!(record.head(), b"hi");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
let record = record.to_owned_record();
assert_eq!(record.head(), b"hi");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
let mut out = Cursor::new(Vec::new());
assert_eq!(record.write(&mut out).unwrap(), 12);
assert_eq!(&out.into_inner()[..], b"@hi\nNN\n+\n++\n");
} else {
assert_eq!(record.head(), b"hallo");
assert_eq!(record.seq(), b"TCC");
assert_eq!(record.qual(), b"abc");
let record = record.to_owned_record();
assert_eq!(record.head(), b"hallo");
assert_eq!(record.seq(), b"TCC");
assert_eq!(record.qual(), b"abc");
let mut out = Cursor::new(Vec::new());
assert_eq!(record.write(&mut out).unwrap(), 17);
assert_eq!(&out.into_inner()[..], b"@hallo\nTCC\n+\nabc\n");
}
assert!(i < 2);
i += 1;
true
});
ok.unwrap();
}
#[test]
fn empty_id() {
let data = Cursor::new(b"@\nNN\n+\n++\n");
let parser = Parser::new(data);
parser.each(|record| {
assert_eq!(record.head(), b"");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
true
}).unwrap();
}
#[test]
fn missing_lines() {
let data = Cursor::new(b"@hi\nNN\n+\n++\n@hi\nNN");
let parser = Parser::new(data);
let ok = parser.each(|record| {
assert_eq!(record.head(), b"hi");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
true
});
match ok {
Err(e) => { assert!(e.kind() == ErrorKind::InvalidData) },
Ok(_) => { panic!("should fail") },
}
}
#[test]
fn truncated() {
let data = Cursor::new(b"@hi\nNN\n+\n++");
let parser = Parser::new(data);
let ok = parser.each(|_| { assert!(false); true });
assert!(ok.is_err());
}
#[test]
fn second_idline() {
let data = Cursor::new(b"@hi\nNN\n+hi\n++\n@hi\nNN\n+hi\n++\n");
let parser = Parser::new(data);
let ok = parser.each(|record| {
assert_eq!(record.head(), b"hi");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
let mut out = Cursor::new(Vec::new());
assert_eq!(record.write(&mut out).unwrap(), 14);
assert_eq!(&out.into_inner()[..], b"@hi\nNN\n+hi\n++\n");
true
});
ok.unwrap();
}
#[test]
fn windows_lineend() {
let data = Cursor::new(b"@hi\r\nNN\r\n+\r\n++\r\n@hi\r\nNN\r\n+\r\n++\r\n");
let parser = Parser::new(data);
let ok = parser.each(|record| {
assert_eq!(record.head(), b"hi");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
true
});
ok.unwrap();
}
#[test]
fn length_mismatch() {
let data = Cursor::new(b"@hi\nNN\n+\n+\n");
let parser = Parser::new(data);
let ok = parser.each(|_| { assert!(false); true });
assert!(ok.is_err());
}
#[test]
fn huge_incomplete() {
let mut data: Cursor<Vec<u8>> = Cursor::new(vec![]);
data.write(b"@").unwrap();
for _ in 0..super::BUFSIZE {
data.write(b"longid").unwrap();
};
data.seek(SeekFrom::Start(0)).unwrap();
let parser = Parser::new(data);
assert!(parser.each(|_| true).is_err());
}
#[test]
fn bufflen() {
let mut data: Cursor<Vec<u8>> = Cursor::new(vec![]);
data.write(b"@").unwrap();
for _ in 0..(super::BUFSIZE - 8) {
data.write(b"a").unwrap();
};
data.write(b"\nA\n+\nB\n").unwrap();
data.seek(SeekFrom::Start(0)).unwrap();
let parser = Parser::new(data);
let vals: Vec<u64> = parser.parallel_each(2, |sets| {
let mut count = 0;
for set in sets {
for _ in set.iter() {
count += 1;
}
}
count
}).unwrap();
assert!(vals.iter().sum::<u64>() == 1);
}
#[test]
fn refset() {
let data = Cursor::new(b"@hi\nNN\n+\n++\n@hi\nNN\n+\n++\n");
let parser = Parser::new(data);
let mut count: usize = 0;
for set in parser.record_sets() {
let set = set.unwrap();
for record in set.iter() {
count += 1;
assert_eq!(record.head(), b"hi");
assert_eq!(record.seq(), b"NN");
assert_eq!(record.qual(), b"++");
}
}
assert_eq!(count, 2)
}
#[test]
fn refset_incomplete() {
let data = Cursor::new(b"@hi\nNN\n+\n++\n@hi\nNN\n+\n++");
let parser = Parser::new(data);
assert!(parser.record_sets().any(|x| x.is_err()));
}
#[test]
fn refset_huge_incomplete() {
let mut data: Cursor<Vec<u8>> = Cursor::new(vec![]);
data.write(b"@").unwrap();
for _ in 0..super::BUFSIZE {
data.write(b"longid").unwrap();
};
data.seek(SeekFrom::Start(0)).unwrap();
let parser = Parser::new(data);
assert!(parser.record_sets().any(|x| x.is_err()));
}
}