1use ex::fs::File; use std::io::{BufRead, BufReader};
3use std::path::Path;
4use flate2::read::MultiGzDecoder;
5use crate::seq_db::SeqDB;
6use crate::{FileType};
7use crate::record::{MutRefRecord, OwnedRecord, RefRecord};
8
9pub struct StaticFastXReader<R: std::io::BufRead>{
11 pub filetype: FileType,
12 pub filename: Option<String>, pub input: R,
14 pub seq_buf: Vec<u8>,
15 pub head_buf: Vec<u8>,
16 pub qual_buf: Vec<u8>,
17 pub plus_buf: Vec<u8>, pub fasta_temp_buf: Vec<u8>, }
20
21pub trait SeqStream{
22 fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>;
23}
24
25trait JSeqIOReaderInterface{
28 fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>;
29 fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>;
30
31 fn into_db_boxed(self: Box<Self>) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>;
34 fn into_db_with_revcomp_boxed(self: Box<Self>) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>;
35
36 fn filetype(&self)-> FileType;
37
38 fn set_filepath(&mut self, filepath: &Path);
40}
41
42#[derive(Debug)]
43pub struct ParseError{
44 pub message: String,
45 pub filename: Option<String>,
46 pub filetype: Option<FileType>,
47}
48
49impl std::error::Error for ParseError{}
50
51impl std::fmt::Display for ParseError{
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "{:?}", self)
54 }
55}
56
57impl<R: std::io::BufRead> StaticFastXReader<R>{
58
59 fn build_parse_error(&self, message: &str) -> Box<ParseError>{
60 Box::new(
61 ParseError{
62 message: message.to_owned(),
63 filename: self.filename.clone(),
64 filetype: Some(self.filetype)
65 }
66 )
67 }
68
69 fn read_fasta_record(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>{
70 self.seq_buf.clear();
71 self.head_buf.clear();
72
73 if self.fasta_temp_buf.is_empty() {
75 let bytes_read = self.input.read_until(b'\n', &mut self.head_buf)?;
77 if bytes_read == 0 {return Ok(None)} } else{
79 self.head_buf.append(&mut self.fasta_temp_buf); }
82
83 loop{
85 let bytes_read = self.input.read_until(b'\n', &mut self.fasta_temp_buf)?;
86 if bytes_read == 0 {
87 if self.seq_buf.is_empty(){
89 return Err(self.build_parse_error("Empty sequence in FASTA file"));
91 }
92 break; }
94
95 let start = self.fasta_temp_buf.len() as isize - bytes_read as isize;
97 if self.fasta_temp_buf[start as usize] == b'>'{
98 break;
100 } else{
101 self.seq_buf.append(&mut self.fasta_temp_buf); self.seq_buf.pop(); }
105 }
106
107 for c in self.seq_buf.iter_mut() {
109 c.make_ascii_uppercase();
110 }
111
112 assert!(self.head_buf[0] == b'>');
114 assert!(*self.head_buf.last().unwrap() == b'\n');
115 let head_buf_len = self.head_buf.len();
116 let head = &mut self.head_buf.as_mut_slice()[1..head_buf_len-1]; Ok(Some(MutRefRecord{head: head,
119 seq: self.seq_buf.as_mut_slice(), qual: None}))
121 }
122
123 fn read_fastq_record(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>{
124
125 self.seq_buf.clear();
126 self.head_buf.clear();
127 self.qual_buf.clear();
128 self.plus_buf.clear();
129
130 let bytes_read = self.input.read_until(b'\n', &mut self.head_buf)?;
132 if bytes_read == 0 {return Ok(None)} if self.head_buf[0] != b'@'{
134 return Err(self.build_parse_error("FASTQ header line does not start with @"));
135 }
136 assert!(self.head_buf.last().unwrap() == &b'\n');
137 self.head_buf.pop(); let bytes_read = self.input.read_until(b'\n', &mut self.seq_buf)?;
141 if bytes_read == 0 {
142 return Err(self.build_parse_error("FASTQ sequence line missing.")); }
144 assert!(self.seq_buf.last().unwrap() == &b'\n');
145 self.seq_buf.pop(); let bytes_read = self.input.read_until(b'\n', &mut self.plus_buf)?;
149 if bytes_read == 0 {
150 return Err(self.build_parse_error("FASTQ + line missing.")); }
152
153 let bytes_read = self.input.read_until(b'\n', &mut self.qual_buf)?;
155 if bytes_read == 0 { return Err(self.build_parse_error("FASTQ quality line missing.")); }
158 assert!(self.qual_buf.last().unwrap() == &b'\n');
159 self.qual_buf.pop(); if self.qual_buf.len() != self.seq_buf.len() {
161 let msg = format!("FASTQ quality line has different length than sequence line ({} vs {})", self.qual_buf.len(), self.seq_buf.len());
162 return Err(self.build_parse_error(&msg));
163 }
164
165 for c in self.seq_buf.iter_mut() {
167 c.make_ascii_uppercase();
168 }
169
170 assert!(self.head_buf[0] == b'@');
171 Ok(Some(MutRefRecord{head: &mut self.head_buf[1..], seq: &mut self.seq_buf,
173 qual: Some(&mut self.qual_buf)}))
174 }
175
176 pub fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
179 match self.filetype{
180 FileType::FASTA => self.read_fasta_record().map(|opt| opt.map(|rec| rec.into_shared_ref())),
181 FileType::FASTQ => self.read_fastq_record().map(|opt| opt.map(|rec| rec.into_shared_ref())),
182 }
183 }
184
185 pub fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>> {
188 match self.filetype{
189 FileType::FASTA => self.read_fasta_record(),
190 FileType::FASTQ => self.read_fastq_record(),
191 }
192 }
193
194 fn new_with_format(input: R, filetype: FileType) -> Self{
196 StaticFastXReader{filetype,
197 input,
198 filename: None,
199 seq_buf: Vec::<u8>::new(),
200 head_buf: Vec::<u8>::new(),
201 qual_buf: Vec::<u8>::new(),
202 plus_buf: Vec::<u8>::new(),
203 fasta_temp_buf: Vec::<u8>::new(),}
204 }
205
206 pub fn new(mut input: R) -> Result<Self, Box<dyn std::error::Error>>{
208 let bytes = input.fill_buf()?;
209
210 let mut filetype = FileType::FASTA;
212
213 if !bytes.is_empty(){
214 filetype = match bytes[0]{
215 b'>' => FileType::FASTA,
216 b'@' => FileType::FASTQ,
217 _ => return Err(
218 Box::new(ParseError{message: "Error: File does not start with '>' or '@'".to_owned(),
219 filename: None,
220 filetype: None}))
221 }
222 }
223
224 Ok(StaticFastXReader::new_with_format(input, filetype))
225
226 }
227
228 pub fn into_db_with_revcomp(mut self) -> Result<(SeqDB, SeqDB), Box<dyn std::error::Error>>{
229
230 let mut rc_record =
232 OwnedRecord{
233 head: Vec::new(),
234 seq: Vec::new(),
235 qual: match self.filetype{
236 FileType::FASTA => None,
237 FileType::FASTQ => Some(Vec::new()),
238 }
239 };
240
241 let mut fw_db = SeqDB::new();
242 let mut rc_db = SeqDB::new();
243
244 while let Some(rec) = self.read_next()?{
245 fw_db.push_record(rec);
246
247 rc_record.head.clear();
249 rc_record.head.extend_from_slice(rec.head);
250 rc_record.seq.clear();
251 rc_record.seq.extend_from_slice(rec.seq);
252 if let Some(qual) = &mut rc_record.qual{
253 qual.clear();
254 qual.extend_from_slice(rec.qual.unwrap());
255 }
256
257 rc_record.reverse_complement();
259 rc_db.push_record(rc_record.as_ref_record());
260 }
261 fw_db.shrink_to_fit();
262 rc_db.shrink_to_fit();
263 Ok((fw_db, rc_db))
264 }
265
266 pub fn into_db(mut self) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
267 let mut db = SeqDB::new();
268
269 while let Some(rec) = self.read_next()?{
270 db.push_record(rec);
271 }
272 db.shrink_to_fit();
273 Ok(db)
274 }
275
276
277
278}
279
280
281pub struct DynamicFastXReader {
282 stream: Box<dyn JSeqIOReaderInterface + Send>,
283 compression_type: crate::CompressionType,
284}
285
286impl DynamicFastXReader {
289
290 pub fn from_file<P: AsRef<std::path::Path>>(filepath: &P) -> Result<Self, Box<dyn std::error::Error>> {
292 let input = File::open(filepath).unwrap();
293 let mut reader = Self::new(BufReader::new(input))?;
294 reader.stream.set_filepath(filepath.as_ref());
295 Ok(reader)
296 }
297
298 pub fn from_stdin() -> Result<Self, Box<dyn std::error::Error>> {
300 let input = std::io::stdin();
301 let reader = Self::new(BufReader::new(input))?;
302 Ok(reader)
303 }
304
305 pub fn new<R: std::io::BufRead + 'static + Send>(mut input: R) -> Result<Self, Box<dyn std::error::Error>>{
307 let bytes = input.fill_buf()?;
308 let mut gzipped = false;
309 match bytes.len(){
310 0 => (), 1 => return Err(Box::new(ParseError{message: "Corrupt FASTA/FASTQ file: only one byte found.".to_owned(),
312 filename: None,
313 filetype: None})),
314 _ => { if bytes[0] == 0x1f && bytes[1] == 0x8b{
316 gzipped = true;
317 }
318 }
319 }
320
321 match gzipped{
322 true => {
323 let gzdecoder = MultiGzDecoder::<R>::new(input);
324
325 let gzbufdecoder = BufReader::<MultiGzDecoder::<R>>::new(gzdecoder);
327 Self::from_raw_stream(gzbufdecoder, crate::CompressionType::Gzip)
328 },
329 false => Self::from_raw_stream(input, crate::CompressionType::None)
330 }
331 }
332
333 fn from_raw_stream<R: std::io::BufRead + 'static + Send>(r: R, compression_type: crate::CompressionType) -> Result<Self, Box<dyn std::error::Error>>{
336 let reader = StaticFastXReader::<R>::new(r)?;
337 Ok(DynamicFastXReader {stream: Box::new(reader), compression_type})
338 }
339
340 pub fn into_db(self) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
341 self.stream.into_db_boxed()
342 }
343
344 pub fn into_db_with_revcomp(self) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>{
345 self.stream.into_db_with_revcomp_boxed()
346 }
347
348 pub fn compression_type(&self) -> crate::CompressionType{
349 self.compression_type
350 }
351
352 pub fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
353 self.stream.read_next()
354 }
355
356 pub fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>{
357 self.stream.read_next_mut()
358 }
359
360 pub fn filetype(&self)-> FileType{
361 self.stream.filetype()
362 }
363
364 pub fn set_filepath(&mut self, filepath: &Path){
366 self.stream.set_filepath(filepath);
367 }
368}
369
370
371impl<R: BufRead> JSeqIOReaderInterface for StaticFastXReader<R>{
374
375 fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
376 self.read_next()
377 }
378
379 fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>> {
380 self.read_next_mut()
381 }
382
383 fn filetype(&self)-> FileType{
384 self.filetype
385 }
386
387 fn into_db_boxed(self: Box<Self>) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
388 self.into_db()
389 }
390
391 fn into_db_with_revcomp_boxed(self: Box<Self>) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>{
392 self.into_db_with_revcomp()
393 }
394
395 fn set_filepath(&mut self, filepath: &Path){
397 self.filename = Some(filepath.as_os_str().to_str().unwrap().to_owned());
398 }
399
400}
401
402impl SeqStream for DynamicFastXReader {
403 fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
404 DynamicFastXReader::read_next(self)
405 }
406}
407
408impl<R: BufRead> SeqStream for StaticFastXReader<R> {
409 fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
410 StaticFastXReader::read_next(self)
411 }
412}
413
414pub struct SeqStreamWithRevComp<S: SeqStream> {
417 inner: S,
418 rec: OwnedRecord,
419 parity: bool, }
421
422impl<S: SeqStream> SeqStreamWithRevComp<S> {
423 pub fn new(inner: S) -> Self{
424 Self{
425 inner,
426 rec: OwnedRecord{seq: Vec::new(), head: Vec::new(), qual: None},
427 parity: false,
428 }
429 }
430}
431
432impl<S: SeqStream> SeqStream for SeqStreamWithRevComp<S> {
433 fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
436 self.parity = !self.parity;
437
438 if self.parity {
439 let new = match self.inner.read_next()? {
440 None => return Ok(None), Some(r) => r
442 };
443
444 self.rec.head.clear();
446 self.rec.head.extend(new.head);
447 self.rec.seq.clear();
448 self.rec.seq.extend(new.seq);
449 if let Some(q) = new.qual {
450 if self.rec.qual.is_none() {
451 self.rec.qual = Some(Vec::<u8>::new());
452 }
453 self.rec.qual.as_mut().unwrap().clear();
454 self.rec.qual.as_mut().unwrap().extend(q);
455 }
456
457 return Ok(Some(self.rec.as_ref_record()));
458
459 } else {
460 crate::reverse_complement_in_place(&mut self.rec.seq);
461 return Ok(Some(self.rec.as_ref_record()));
462 }
463 }
464}