use std::borrow::Cow;
use std::fmt::Debug;
use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Cursor;
use std::io::Seek;
use std::io::SeekFrom;
use std::iter::FusedIterator;
use std::path::Path;
use std::sync::RwLock;
mod ioslice;
mod parser;
mod reader;
use self::ioslice::IoSlice;
use self::reader::CStringReader;
use self::reader::LengthReader;
use self::reader::MaskReader;
use self::reader::SequenceReader;
use super::Rc;
use crate::data::Flag;
use crate::data::Flags;
use crate::data::Header;
use crate::data::MaskUnit;
use crate::data::Record;
use crate::data::SequenceType;
use crate::error::Error;
type ZstdDecoder<'z, R> = BufReader<zstd::Decoder<'z, BufReader<IoSlice<R>>>>;
#[derive(Debug, Clone)]
pub struct DecoderBuilder {
buffer_size: usize,
id: bool,
comment: bool,
sequence: bool,
quality: bool,
mask: bool,
}
impl DecoderBuilder {
pub fn new() -> Self {
Self {
buffer_size: 4096,
id: true,
comment: true,
sequence: true,
quality: true,
mask: true,
}
}
pub fn from_flags<F: Into<Flags>>(flags: F) -> Self {
let flags = flags.into();
let mut builder = Self::new();
builder.quality(flags.test(Flag::Quality));
builder.sequence(flags.test(Flag::Sequence));
builder.mask(flags.test(Flag::Mask));
builder.comment(flags.test(Flag::Comment));
builder
}
pub fn buffer_size(&mut self, buffer_size: usize) -> &mut Self {
self.buffer_size = buffer_size;
self
}
#[inline]
pub fn id(&mut self, id: bool) -> &mut Self {
self.id = id;
self
}
#[inline]
pub fn comment(&mut self, comment: bool) -> &mut Self {
self.comment = comment;
self
}
#[inline]
pub fn sequence(&mut self, sequence: bool) -> &mut Self {
self.sequence = sequence;
self
}
#[inline]
pub fn quality(&mut self, quality: bool) -> &mut Self {
self.quality = quality;
self
}
#[inline]
pub fn mask(&mut self, mask: bool) -> &mut Self {
self.mask = mask;
self
}
pub fn with_bytes<'data, 'z>(
&self,
bytes: &'data [u8],
) -> Result<Decoder<'z, BufReader<Cursor<&'data [u8]>>>, Error> {
self.with_reader(BufReader::new(Cursor::new(bytes)))
}
pub fn with_path<'z, P: AsRef<Path>>(
&self,
path: P,
) -> Result<Decoder<'z, BufReader<File>>, Error> {
File::open(path.as_ref())
.map_err(Error::from)
.and_then(|f| self.with_reader(std::io::BufReader::new(f)))
}
pub fn with_reader<'z, R: BufRead + Seek>(
&self,
mut reader: R,
) -> Result<Decoder<'z, R>, Error> {
let buffer = reader.fill_buf()?;
let header = match self::parser::header(buffer) {
Ok((i, header)) => {
let consumed = buffer.len() - i.len();
reader.consume(consumed);
header
}
Err(nom::Err::Incomplete(_)) => {
return Err(Error::from(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to read header",
)));
}
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
return Err(Error::from(e));
}
};
if header.flags().test(Flag::Title) {
let buf = reader.fill_buf()?;
let (i, _title) = self::parser::title(buf)?;
let consumed = buf.len() - i.len();
reader.consume(consumed);
}
let rc = Rc::new(RwLock::new(reader));
macro_rules! setup_block {
($flags:expr, $flag:ident, $use_block:expr, $rc:ident, $block:ident) => {
let _length: u64;
setup_block!($flags, $flag, $use_block, $rc, $block, _length);
};
($flags:expr, $flag:ident, $use_block:expr, $rc:ident, $block:ident, $block_length:ident) => {
let $block;
if $flags.test(Flag::$flag) {
let tee = $rc.clone();
let mut handle = $rc.write().unwrap();
let buf = handle.fill_buf()?;
let (i, original_size) = self::parser::variable_u64(buf)?;
let (i, compressed_size) = self::parser::variable_u64(i)?;
$block_length = original_size;
let consumed = buf.len() - i.len();
handle.consume(consumed);
if $use_block {
let pos = handle.stream_position()?;
let tee_slice = IoSlice::new(tee, pos, pos + compressed_size);
let mut decoder = zstd::stream::read::Decoder::new(tee_slice)?;
decoder.include_magicbytes(false)?;
$block = Some(BufReader::with_capacity(self.buffer_size, decoder));
} else {
$block = None;
}
handle.seek(SeekFrom::Current(compressed_size as i64))?;
} else {
$block = None;
}
};
}
let flags = header.flags();
let mut seqlen = 0;
setup_block!(flags, Id, self.id, rc, ids_block);
setup_block!(flags, Comment, self.comment, rc, com_block);
setup_block!(flags, Length, true, rc, len_block);
setup_block!(flags, Mask, self.mask, rc, mask_block);
setup_block!(flags, Sequence, self.sequence, rc, seq_block, seqlen);
setup_block!(flags, Quality, self.quality, rc, quality_block);
Ok(Decoder {
ids: ids_block.map(CStringReader::new),
com: com_block.map(CStringReader::new),
len: len_block.map(LengthReader::new),
seq: seq_block.map(|x| SequenceReader::new(x, header.sequence_type())),
qual: quality_block.map(|x| SequenceReader::new(x, SequenceType::Text)),
mask: mask_block.map(|x| MaskReader::new(x, seqlen)),
n: 0,
header,
reader: rc,
unit: MaskUnit::Unmasked(0),
})
}
}
impl Default for DecoderBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct Decoder<'z, R: BufRead + Seek> {
header: Header,
reader: Rc<RwLock<R>>,
ids: Option<CStringReader<ZstdDecoder<'z, R>>>,
com: Option<CStringReader<ZstdDecoder<'z, R>>>,
len: Option<LengthReader<ZstdDecoder<'z, R>>>,
seq: Option<SequenceReader<ZstdDecoder<'z, R>>>,
qual: Option<SequenceReader<ZstdDecoder<'z, R>>>,
mask: Option<MaskReader<ZstdDecoder<'z, R>>>,
n: usize,
unit: MaskUnit,
}
impl Decoder<'_, BufReader<File>> {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
DecoderBuilder::new().with_path(path)
}
}
impl<R: BufRead + Seek> Decoder<'_, R> {
pub fn new(reader: R) -> Result<Self, Error> {
DecoderBuilder::new().with_reader(reader)
}
#[inline]
pub fn header(&self) -> &Header {
&self.header
}
#[inline]
pub fn sequence_type(&self) -> SequenceType {
self.header().sequence_type()
}
pub fn into_inner(self) -> R {
let reader = self.reader.clone();
drop(self);
Rc::into_inner(reader)
.expect("reference count should be 1 after decoder is dropped")
.into_inner()
.expect("lock shouldn't be poisoned")
}
fn next_record(&mut self) -> Result<Record<'static>, Error> {
let id = self
.ids
.as_mut()
.and_then(|r| r.next())
.transpose()?
.map(|id| id.into_string().map(Cow::Owned).expect("TODO"));
let comment = self
.com
.as_mut()
.and_then(|r| r.next())
.transpose()?
.map(|com| com.into_string().map(Cow::Owned).expect("TODO"));
let length = self.len.as_mut().and_then(|r| r.next()).transpose()?;
let mut sequence: Option<Cow<'static, str>> = None;
let mut quality = None;
if let Some(l) = length {
sequence = self
.seq
.as_mut()
.map(|r| r.next(l))
.transpose()?
.map(Cow::Owned);
quality = self
.qual
.as_mut()
.map(|r| r.next(l))
.transpose()?
.map(Cow::Owned);
if let Some(seq) = sequence.as_mut() {
self.mask_sequence(seq.to_mut())?;
}
}
self.n += 1;
Ok(Record {
id,
comment,
sequence,
quality,
length,
})
}
fn mask_sequence(&mut self, sequence: &mut str) -> Result<(), Error> {
let mut mask = self.unit.clone();
let mut seq = sequence;
if let Some(mask_reader) = self.mask.as_mut() {
loop {
match mask {
MaskUnit::Masked(n) => {
if n < seq.len() as u64 {
seq[..n as usize].make_ascii_lowercase();
seq = &mut seq[n as usize..];
} else {
self.unit = MaskUnit::Masked(n - seq.len() as u64);
break;
}
}
MaskUnit::Unmasked(n) => {
if n < seq.len() as u64 {
seq = &mut seq[n as usize..];
} else {
self.unit = MaskUnit::Unmasked(n - seq.len() as u64);
break;
}
}
}
mask = match mask_reader.next() {
Some(Ok(x)) => x,
Some(Err(e)) => return Err(Error::Io(e)),
None => {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to get mask unit",
)))
}
};
}
}
Ok(())
}
}
impl<R: BufRead + Seek> Iterator for Decoder<'_, R> {
type Item = Result<Record<'static>, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.n as u64 >= self.header.number_of_sequences() {
return None;
}
Some(self.next_record())
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.header.number_of_sequences() as usize - self.n;
(remaining, Some(remaining))
}
}
impl<R: BufRead + Seek> ExactSizeIterator for Decoder<'_, R> {}
impl<R: BufRead + Seek> FusedIterator for Decoder<'_, R> {}
#[cfg(test)]
mod tests {
use super::*;
const ARCHIVE: &[u8] = include_bytes!("../../../data/LuxC.naf");
#[test]
fn error_empty() {
match Decoder::new(std::io::Cursor::new(b"")) {
Ok(_decoder) => panic!("unexpected success"),
Err(Error::Io(e)) => assert!(matches!(e.kind(), std::io::ErrorKind::UnexpectedEof)),
Err(e) => panic!("unexpected error: {:?}", e),
}
}
#[test]
fn decoder() {
let decoder = Decoder::new(std::io::Cursor::new(ARCHIVE)).unwrap();
let records = decoder.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(records.len(), 12);
}
#[test]
fn masks() {
const ARCHIVE: &[u8] = include_bytes!("../../../data/masked.naf");
let decoder = Decoder::new(std::io::Cursor::new(ARCHIVE)).unwrap();
let mut mask_reader = decoder.mask.unwrap();
assert_eq!(
mask_reader.next().unwrap().unwrap(),
MaskUnit::Unmasked(657)
);
assert_eq!(mask_reader.next().unwrap().unwrap(), MaskUnit::Masked(19));
assert_eq!(
mask_reader.next().unwrap().unwrap(),
MaskUnit::Unmasked(635)
);
assert_eq!(mask_reader.next().unwrap().unwrap(), MaskUnit::Masked(39));
assert_eq!(
mask_reader.next().unwrap().unwrap(),
MaskUnit::Unmasked(725)
);
}
#[test]
fn skip_sequence() {
let decoder = DecoderBuilder::new()
.sequence(false)
.with_reader(std::io::Cursor::new(ARCHIVE))
.unwrap();
for record in decoder.map(Result::unwrap) {
assert!(record.sequence.is_none());
}
}
}