use ex::fs::File; use std::io::{BufRead, BufReader};
use std::path::Path;
use flate2::read::MultiGzDecoder;
use crate::seq_db::SeqDB;
use crate::{FileType};
use crate::record::{RefRecord, OwnedRecord};
pub struct StaticFastXReader<R: std::io::BufRead>{
pub filetype: FileType,
pub filename: Option<String>, pub input: R,
pub seq_buf: Vec<u8>,
pub head_buf: Vec<u8>,
pub qual_buf: Vec<u8>,
pub plus_buf: Vec<u8>, pub fasta_temp_buf: Vec<u8>, }
pub trait SeqStream{
fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>;
}
trait JSeqIOReaderInterface{
fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>;
fn into_db_boxed(self: Box<Self>) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>;
fn into_db_with_revcomp_boxed(self: Box<Self>) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>;
fn filetype(&self)-> FileType;
fn set_filepath(&mut self, filepath: &Path);
}
#[derive(Debug)]
pub struct ParseError{
pub message: String,
pub filename: Option<String>,
pub filetype: Option<FileType>,
}
impl std::error::Error for ParseError{}
impl std::fmt::Display for ParseError{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl<R: std::io::BufRead> StaticFastXReader<R>{
fn build_parse_error(&self, message: &str) -> Box<ParseError>{
Box::new(
ParseError{
message: message.to_owned(),
filename: self.filename.clone(),
filetype: Some(self.filetype)
}
)
}
fn read_fasta_record(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
self.seq_buf.clear();
self.head_buf.clear();
if self.fasta_temp_buf.is_empty() {
let bytes_read = self.input.read_until(b'\n', &mut self.head_buf)?;
if bytes_read == 0 {return Ok(None)} } else{
self.head_buf.append(&mut self.fasta_temp_buf); }
loop{
let bytes_read = self.input.read_until(b'\n', &mut self.fasta_temp_buf)?;
if bytes_read == 0 {
if self.seq_buf.is_empty(){
return Err(self.build_parse_error("Empty sequence in FASTA file"));
}
break; }
let start = self.fasta_temp_buf.len() as isize - bytes_read as isize;
if self.fasta_temp_buf[start as usize] == b'>'{
break;
} else{
self.seq_buf.append(&mut self.fasta_temp_buf); self.seq_buf.pop(); }
}
for c in self.seq_buf.iter_mut() {
c.make_ascii_uppercase();
}
Ok(Some(RefRecord{head: self.head_buf.as_slice().strip_prefix(b">").unwrap().strip_suffix(b"\n").unwrap(),
seq: self.seq_buf.as_slice(), qual: None}))
}
fn read_fastq_record(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
self.seq_buf.clear();
self.head_buf.clear();
self.qual_buf.clear();
self.plus_buf.clear();
let bytes_read = self.input.read_until(b'\n', &mut self.head_buf)?;
if bytes_read == 0 {return Ok(None)} if self.head_buf[0] != b'@'{
return Err(self.build_parse_error("FASTQ header line does not start with @"));
}
let bytes_read = self.input.read_until(b'\n', &mut self.seq_buf)?;
if bytes_read == 0 {
return Err(self.build_parse_error("FASTQ sequence line missing.")); }
let bytes_read = self.input.read_until(b'\n', &mut self.plus_buf)?;
if bytes_read == 0 {
return Err(self.build_parse_error("FASTQ + line missing.")); }
let bytes_read = self.input.read_until(b'\n', &mut self.qual_buf)?;
if bytes_read == 0 { return Err(self.build_parse_error("FASTQ quality line missing.")); } else if bytes_read != self.seq_buf.len(){
let msg = format!("FASTQ quality line has different length than sequence line ({} vs {})", bytes_read, self.seq_buf.len());
return Err(self.build_parse_error(&msg));
}
for c in self.seq_buf.iter_mut() {
c.make_ascii_uppercase();
}
Ok(Some(RefRecord{head: self.head_buf.as_slice().strip_prefix(b"@").unwrap().strip_suffix(b"\n").unwrap(),
seq: self.seq_buf.as_slice().strip_suffix(b"\n").unwrap(),
qual: Some(self.qual_buf.as_slice().strip_suffix(b"\n").unwrap())}))
}
pub fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
match self.filetype{
FileType::FASTA => self.read_fasta_record(),
FileType::FASTQ => self.read_fastq_record(),
}
}
fn new_with_format(input: R, filetype: FileType) -> Self{
StaticFastXReader{filetype,
input,
filename: None,
seq_buf: Vec::<u8>::new(),
head_buf: Vec::<u8>::new(),
qual_buf: Vec::<u8>::new(),
plus_buf: Vec::<u8>::new(),
fasta_temp_buf: Vec::<u8>::new(),}
}
pub fn new(mut input: R) -> Result<Self, Box<dyn std::error::Error>>{
let bytes = input.fill_buf()?;
let mut filetype = FileType::FASTA;
if !bytes.is_empty(){
filetype = match bytes[0]{
b'>' => FileType::FASTA,
b'@' => FileType::FASTQ,
_ => return Err(
Box::new(ParseError{message: "Error: File does not start with '>' or '@'".to_owned(),
filename: None,
filetype: None}))
}
}
Ok(StaticFastXReader::new_with_format(input, filetype))
}
pub fn into_db_with_revcomp(mut self) -> Result<(SeqDB, SeqDB), Box<dyn std::error::Error>>{
let mut rc_record =
OwnedRecord{
head: Vec::new(),
seq: Vec::new(),
qual: match self.filetype{
FileType::FASTA => None,
FileType::FASTQ => Some(Vec::new()),
}
};
let mut fw_db = SeqDB::new();
let mut rc_db = SeqDB::new();
while let Some(rec) = self.read_next()?{
fw_db.push_record(rec);
rc_record.head.clear();
rc_record.head.extend_from_slice(rec.head);
rc_record.seq.clear();
rc_record.seq.extend_from_slice(rec.seq);
if let Some(qual) = &mut rc_record.qual{
qual.clear();
qual.extend_from_slice(rec.qual.unwrap());
}
rc_record.reverse_complement();
rc_db.push_record(rc_record.as_ref_record());
}
fw_db.shrink_to_fit();
rc_db.shrink_to_fit();
Ok((fw_db, rc_db))
}
pub fn into_db(mut self) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
let mut db = SeqDB::new();
while let Some(rec) = self.read_next()?{
db.push_record(rec);
}
db.shrink_to_fit();
Ok(db)
}
}
pub struct DynamicFastXReader {
stream: Box<dyn JSeqIOReaderInterface + Send>,
compression_type: crate::CompressionType,
}
impl DynamicFastXReader {
pub fn from_file<P: AsRef<std::path::Path>>(filepath: &P) -> Result<Self, Box<dyn std::error::Error>> {
let input = File::open(filepath).unwrap();
let mut reader = Self::new(BufReader::new(input))?;
reader.stream.set_filepath(filepath.as_ref());
Ok(reader)
}
pub fn from_stdin() -> Result<Self, Box<dyn std::error::Error>> {
let input = std::io::stdin();
let reader = Self::new(BufReader::new(input))?;
Ok(reader)
}
pub fn new<R: std::io::BufRead + 'static + Send>(mut input: R) -> Result<Self, Box<dyn std::error::Error>>{
let bytes = input.fill_buf()?;
let mut gzipped = false;
match bytes.len(){
0 => (), 1 => return Err(Box::new(ParseError{message: "Corrupt FASTA/FASTQ file: only one byte found.".to_owned(),
filename: None,
filetype: None})),
_ => { if bytes[0] == 0x1f && bytes[1] == 0x8b{
gzipped = true;
}
}
}
match gzipped{
true => {
let gzdecoder = MultiGzDecoder::<R>::new(input);
let gzbufdecoder = BufReader::<MultiGzDecoder::<R>>::new(gzdecoder);
Self::from_raw_stream(gzbufdecoder, crate::CompressionType::Gzip)
},
false => Self::from_raw_stream(input, crate::CompressionType::None)
}
}
fn from_raw_stream<R: std::io::BufRead + 'static + Send>(r: R, compression_type: crate::CompressionType) -> Result<Self, Box<dyn std::error::Error>>{
let reader = StaticFastXReader::<R>::new(r)?;
Ok(DynamicFastXReader {stream: Box::new(reader), compression_type})
}
pub fn into_db(self) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
self.stream.into_db_boxed()
}
pub fn into_db_with_revcomp(self) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>{
self.stream.into_db_with_revcomp_boxed()
}
pub fn compression_type(&self) -> crate::CompressionType{
self.compression_type
}
pub fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
self.stream.read_next()
}
pub fn filetype(&self)-> FileType{
self.stream.filetype()
}
pub fn set_filepath(&mut self, filepath: &Path){
self.stream.set_filepath(filepath);
}
}
impl<R: BufRead> JSeqIOReaderInterface for StaticFastXReader<R>{
fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
self.read_next()
}
fn filetype(&self)-> FileType{
self.filetype
}
fn into_db_boxed(self: Box<Self>) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
self.into_db()
}
fn into_db_with_revcomp_boxed(self: Box<Self>) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>{
self.into_db_with_revcomp()
}
fn set_filepath(&mut self, filepath: &Path){
self.filename = Some(filepath.as_os_str().to_str().unwrap().to_owned());
}
}
impl SeqStream for DynamicFastXReader {
fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
DynamicFastXReader::read_next(self)
}
}
impl<R: BufRead> SeqStream for StaticFastXReader<R> {
fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
StaticFastXReader::read_next(self)
}
}
pub struct SeqStreamWithRevComp<S: SeqStream> {
inner: S,
rec: OwnedRecord,
parity: bool, }
impl<S: SeqStream> SeqStreamWithRevComp<S> {
pub fn new(inner: S) -> Self{
Self{
inner,
rec: OwnedRecord{seq: Vec::new(), head: Vec::new(), qual: None},
parity: false,
}
}
}
impl<S: SeqStream> SeqStream for SeqStreamWithRevComp<S> {
fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
self.parity = !self.parity;
if self.parity {
let new = match self.inner.read_next()? {
None => return Ok(None), Some(r) => r
};
self.rec.head.clear();
self.rec.head.extend(new.head);
self.rec.seq.clear();
self.rec.seq.extend(new.seq);
if let Some(q) = new.qual {
if self.rec.qual.is_none() {
self.rec.qual = Some(Vec::<u8>::new());
}
self.rec.qual.as_mut().unwrap().clear();
self.rec.qual.as_mut().unwrap().extend(q);
}
return Ok(Some(self.rec.as_ref_record()));
} else {
crate::reverse_complement_in_place(&mut self.rec.seq);
return Ok(Some(self.rec.as_ref_record()));
}
}
}