jseqio/
reader.rs

1use ex::fs::File; // File streams that include the filename in the error messages
2use std::io::{BufRead, BufReader};
3use std::path::Path;
4use flate2::read::MultiGzDecoder;
5use crate::seq_db::SeqDB;
6use crate::{FileType};
7use crate::record::{MutRefRecord, OwnedRecord, RefRecord};
8
9// Takes a BufRead because we need read_until.
10pub struct StaticFastXReader<R: std::io::BufRead>{
11    pub filetype: FileType,
12    pub filename: Option<String>, // Only used for error messages. If None, the file is unknown or there is no file, like when reading from stdin.
13    pub input: R,
14    pub seq_buf: Vec<u8>,
15    pub head_buf: Vec<u8>,
16    pub qual_buf: Vec<u8>,
17    pub plus_buf: Vec<u8>, // For the fastq plus-line
18    pub fasta_temp_buf: Vec<u8>, // Stores the fasta header read in the previous iteration
19}
20
21pub trait SeqStream{
22    fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>;
23}
24
25// Trait for a stream returning RefRecord objects, used in DynamicFastXReader to abstract over
26// The input stream type.
27trait JSeqIOReaderInterface{
28    fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>;
29    fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>;
30
31    // Since we want to call this for trait objects where we don't know the size of the struct,
32    // We need to take self in a Box.
33    fn into_db_boxed(self: Box<Self>) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>;
34    fn into_db_with_revcomp_boxed(self: Box<Self>) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>;
35
36    fn filetype(&self)-> FileType; 
37
38    // For error messages
39    fn set_filepath(&mut self, filepath: &Path);
40}
41
42#[derive(Debug)]
43pub struct ParseError{
44    pub message: String,
45    pub filename: Option<String>,
46    pub filetype: Option<FileType>,
47}
48
49impl std::error::Error for ParseError{}
50
51impl std::fmt::Display for ParseError{
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "{:?}", self)
54    }
55}
56
57impl<R: std::io::BufRead> StaticFastXReader<R>{
58
59    fn build_parse_error(&self, message: &str) -> Box<ParseError>{
60        Box::new(
61            ParseError{
62                message: message.to_owned(), 
63                filename: self.filename.clone(), 
64                filetype: Some(self.filetype)
65            }
66        )
67    }
68
69    fn read_fasta_record(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>{
70        self.seq_buf.clear();
71        self.head_buf.clear();
72
73        // Read header line
74        if self.fasta_temp_buf.is_empty() {
75            // This is the first record -> read header from input
76            let bytes_read = self.input.read_until(b'\n', &mut self.head_buf)?;
77            if bytes_read == 0 {return Ok(None)} // End of stream
78        } else{
79            // Take stashed header from previous iteration
80            self.head_buf.append(&mut self.fasta_temp_buf); // Also clears the temp buf
81        }
82
83        // Read sequence line
84        loop{
85            let bytes_read = self.input.read_until(b'\n', &mut self.fasta_temp_buf)?;
86            if bytes_read == 0 {
87                // No more bytes left to read
88                if self.seq_buf.is_empty(){
89                    // Stream ends with an empty sequence
90                    return Err(self.build_parse_error("Empty sequence in FASTA file"));
91                }
92                break; // Ok, last record of the file
93            }
94
95            // Check if we read the header of the next read
96            let start = self.fasta_temp_buf.len() as isize - bytes_read as isize;
97            if self.fasta_temp_buf[start as usize] == b'>'{
98                // Found a header. Leave it to the buffer for the next iteration.
99                break;
100            } else{
101                // Found more sequence -> Append to self.seq_buf
102                self.seq_buf.append(&mut self.fasta_temp_buf); // Also clears the temp buf
103                self.seq_buf.pop(); // Trim newline (TODO: what if there is none?)
104            }
105        }
106
107        // Make sure all letters are in same case
108        for c in self.seq_buf.iter_mut() {
109            c.make_ascii_uppercase();
110        }
111
112        // Trim '>' and '\n' from the header 
113        assert!(self.head_buf[0] == b'>');
114        assert!(*self.head_buf.last().unwrap() == b'\n');
115        let head_buf_len = self.head_buf.len();
116        let head = &mut self.head_buf.as_mut_slice()[1..head_buf_len-1]; // Remove '>' and '\n'
117
118        Ok(Some(MutRefRecord{head: head, 
119                            seq: self.seq_buf.as_mut_slice(), // Newlines are already trimmed before
120                            qual: None}))
121    }
122
123    fn read_fastq_record(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>{
124
125        self.seq_buf.clear();
126        self.head_buf.clear();
127        self.qual_buf.clear();
128        self.plus_buf.clear();
129
130        // Read header line
131        let bytes_read = self.input.read_until(b'\n', &mut self.head_buf)?;
132        if bytes_read == 0 {return Ok(None)} // End of stream
133        if self.head_buf[0] != b'@'{
134            return Err(self.build_parse_error("FASTQ header line does not start with @"));
135        }
136        assert!(self.head_buf.last().unwrap() == &b'\n');
137        self.head_buf.pop(); // Trim the '\n'
138
139        // Read sequence line
140        let bytes_read = self.input.read_until(b'\n', &mut self.seq_buf)?;
141        if bytes_read == 0 {
142            return Err(self.build_parse_error("FASTQ sequence line missing.")); // File can't end here
143        }
144        assert!(self.seq_buf.last().unwrap() == &b'\n');
145        self.seq_buf.pop(); // Trim the '\n'
146        
147        // read +-line
148        let bytes_read = self.input.read_until(b'\n', &mut self.plus_buf)?;
149        if bytes_read == 0 {
150            return Err(self.build_parse_error("FASTQ + line missing.")); // File can't end here
151        }
152
153        // read qual-line
154        let bytes_read = self.input.read_until(b'\n', &mut self.qual_buf)?;
155        if bytes_read == 0 { // File can't end here
156            return Err(self.build_parse_error("FASTQ quality line missing.")); // File can't end here
157        } 
158        assert!(self.qual_buf.last().unwrap() == &b'\n');
159        self.qual_buf.pop(); // Trim the '\n'
160        if self.qual_buf.len() != self.seq_buf.len() {
161            let msg = format!("FASTQ quality line has different length than sequence line ({} vs {})", self.qual_buf.len(), self.seq_buf.len());
162            return Err(self.build_parse_error(&msg));
163        }
164
165        // Make sure all letters are in same case
166        for c in self.seq_buf.iter_mut() {
167            c.make_ascii_uppercase();
168        }
169
170        assert!(self.head_buf[0] == b'@');
171        Ok(Some(MutRefRecord{head: &mut self.head_buf[1..], // Remove '@'
172                            seq: &mut self.seq_buf,
173                            qual: Some(&mut self.qual_buf)}))
174    }
175
176    // Read one record from the input.
177    // This is not named just next() because it's not a Rust iterator because it streams the input.
178    pub fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
179        match self.filetype{
180            FileType::FASTA => self.read_fasta_record().map(|opt| opt.map(|rec| rec.into_shared_ref())),
181            FileType::FASTQ => self.read_fastq_record().map(|opt| opt.map(|rec| rec.into_shared_ref())),
182        }
183    }
184
185    // Read one record from the input.
186    // This is not named just next() because it's not a Rust iterator because it streams the input.
187    pub fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>> {
188        match self.filetype{
189            FileType::FASTA => self.read_fasta_record(),
190            FileType::FASTQ => self.read_fastq_record(),
191        }
192    }
193
194    // New with known format
195    fn new_with_format(input: R, filetype: FileType) -> Self{
196        StaticFastXReader{filetype,
197                    input,
198                    filename: None,
199                    seq_buf: Vec::<u8>::new(),
200                    head_buf: Vec::<u8>::new(),
201                    qual_buf: Vec::<u8>::new(),
202                    plus_buf: Vec::<u8>::new(),
203                    fasta_temp_buf: Vec::<u8>::new(),}
204    }
205
206    // Detect whether it's fasta or FASTQ based on the first byte.
207    pub fn new(mut input: R) -> Result<Self, Box<dyn std::error::Error>>{
208        let bytes = input.fill_buf()?;
209
210        // Empty file is arbitrarily considered as FASTA
211        let mut filetype = FileType::FASTA;
212
213        if !bytes.is_empty(){
214            filetype = match bytes[0]{
215                b'>' => FileType::FASTA,
216                b'@' => FileType::FASTQ,
217                _ => return Err(
218                        Box::new(ParseError{message: "Error: File does not start with '>' or '@'".to_owned(), 
219                        filename: None, 
220                        filetype: None}))
221            } 
222        }
223
224        Ok(StaticFastXReader::new_with_format(input, filetype))
225
226    }
227
228    pub fn into_db_with_revcomp(mut self) -> Result<(SeqDB, SeqDB), Box<dyn std::error::Error>>{
229
230        // Reusable record for storing the reverse complement
231        let mut rc_record = 
232        OwnedRecord{
233            head: Vec::new(), 
234            seq: Vec::new(),
235            qual: match self.filetype{
236                FileType::FASTA => None,
237                FileType::FASTQ => Some(Vec::new()),
238            }
239        };
240
241        let mut fw_db = SeqDB::new();
242        let mut rc_db = SeqDB::new();
243
244        while let Some(rec) = self.read_next()?{
245            fw_db.push_record(rec);
246
247            // Copy fw record data to rc record
248            rc_record.head.clear();
249            rc_record.head.extend_from_slice(rec.head);
250            rc_record.seq.clear();
251            rc_record.seq.extend_from_slice(rec.seq);
252            if let Some(qual) = &mut rc_record.qual{
253                qual.clear();
254                qual.extend_from_slice(rec.qual.unwrap());
255            }
256
257            // Reverse complement and push to database
258            rc_record.reverse_complement();
259            rc_db.push_record(rc_record.as_ref_record());
260        }
261        fw_db.shrink_to_fit();
262        rc_db.shrink_to_fit();
263        Ok((fw_db, rc_db))
264    }
265
266    pub fn into_db(mut self) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
267        let mut db = SeqDB::new();
268
269        while let Some(rec) = self.read_next()?{
270            db.push_record(rec);
271        }
272        db.shrink_to_fit();
273        Ok(db)
274    }
275
276
277
278}
279
280
281pub struct DynamicFastXReader {
282    stream: Box<dyn JSeqIOReaderInterface + Send>,
283    compression_type: crate::CompressionType,
284}
285
286// A class that contains a dynamic trait object for different
287// types of input streams.
288impl DynamicFastXReader {
289 
290    // New from file
291    pub fn from_file<P: AsRef<std::path::Path>>(filepath: &P) -> Result<Self, Box<dyn std::error::Error>> {
292        let input = File::open(filepath).unwrap();
293        let mut reader = Self::new(BufReader::new(input))?;
294        reader.stream.set_filepath(filepath.as_ref());
295        Ok(reader)
296    }
297
298    // New from stdin
299    pub fn from_stdin() -> Result<Self, Box<dyn std::error::Error>> {
300        let input = std::io::stdin();
301        let reader = Self::new(BufReader::new(input))?;
302        Ok(reader)
303    }
304
305    // New from stream, with automatic gzip detection
306    pub fn new<R: std::io::BufRead + 'static + Send>(mut input: R) -> Result<Self, Box<dyn std::error::Error>>{
307        let bytes = input.fill_buf()?;
308        let mut gzipped = false;
309        match bytes.len(){
310            0 => (), // Empty file
311            1 => return Err(Box::new(ParseError{message: "Corrupt FASTA/FASTQ file: only one byte found.".to_owned(), 
312                        filename: None, 
313                        filetype: None})),
314            _ => { // Two or more bytes available. Check if the first two are a valid gzip header.
315                if bytes[0] == 0x1f && bytes[1] == 0x8b{ 
316                    gzipped = true;
317                }
318            }
319        }
320
321        match gzipped{
322            true => {
323                let gzdecoder = MultiGzDecoder::<R>::new(input);
324
325                // We wrap this in BufReader because the FastX parser requires buffered reading
326                let gzbufdecoder = BufReader::<MultiGzDecoder::<R>>::new(gzdecoder);
327                Self::from_raw_stream(gzbufdecoder, crate::CompressionType::Gzip)
328            },
329            false => Self::from_raw_stream(input, crate::CompressionType::None)
330        }
331    }
332
333    // Creates a reader from a raw stream of uncompressed data (no gzip detection). Used by other constructors.
334    // Need to constrain + 'static because boxed trait objects always need to have a static lifetime.
335    fn from_raw_stream<R: std::io::BufRead + 'static + Send>(r: R, compression_type: crate::CompressionType) -> Result<Self, Box<dyn std::error::Error>>{
336        let reader = StaticFastXReader::<R>::new(r)?;
337        Ok(DynamicFastXReader {stream: Box::new(reader), compression_type})
338    }
339
340    pub fn into_db(self) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
341        self.stream.into_db_boxed()
342    }
343
344    pub fn into_db_with_revcomp(self) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>{
345        self.stream.into_db_with_revcomp_boxed()
346    }
347
348    pub fn compression_type(&self) -> crate::CompressionType{
349        self.compression_type
350    }
351
352    pub fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
353        self.stream.read_next()
354    }
355
356    pub fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>>{
357        self.stream.read_next_mut()
358    }
359
360    pub fn filetype(&self)-> FileType{
361        self.stream.filetype()
362    }
363    
364    // For error messages
365    pub fn set_filepath(&mut self, filepath: &Path){
366        self.stream.set_filepath(filepath);
367    }
368}
369
370
371// Implement common SeqRecordProducer trait for all
372// StaticFastXReaders over the generic parameter R.
373impl<R: BufRead> JSeqIOReaderInterface for StaticFastXReader<R>{
374
375    fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>>{
376        self.read_next()
377    }
378
379    fn read_next_mut(&mut self) -> Result<Option<MutRefRecord>, Box<dyn std::error::Error>> {
380        self.read_next_mut()
381    }
382
383    fn filetype(&self)-> FileType{
384        self.filetype
385    }
386
387    fn into_db_boxed(self: Box<Self>) -> Result<crate::seq_db::SeqDB, Box<dyn std::error::Error>>{
388        self.into_db()
389    }
390
391    fn into_db_with_revcomp_boxed(self: Box<Self>) -> Result<(crate::seq_db::SeqDB, crate::seq_db::SeqDB), Box<dyn std::error::Error>>{
392        self.into_db_with_revcomp()
393    }    
394
395    // For error messages
396    fn set_filepath(&mut self, filepath: &Path){
397        self.filename = Some(filepath.as_os_str().to_str().unwrap().to_owned());
398    }
399
400}
401
402impl SeqStream for DynamicFastXReader {
403    fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
404        DynamicFastXReader::read_next(self)
405    }
406}
407
408impl<R: BufRead> SeqStream for StaticFastXReader<R> {
409    fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
410        StaticFastXReader::read_next(self)
411    }
412}
413
414// Turns a SeqStream into another SeqStream that also streams the reverse
415// complements of the original stream. See read_next() for more information.
416pub struct SeqStreamWithRevComp<S: SeqStream> {
417    inner: S,
418    rec: OwnedRecord,
419    parity: bool, // Every other sequence we return is a reverse complement of the previous
420}
421
422impl<S: SeqStream> SeqStreamWithRevComp<S> {
423    pub fn new(inner: S) -> Self{
424        Self{
425            inner,
426            rec: OwnedRecord{seq: Vec::new(), head: Vec::new(), qual: None},
427            parity: false,
428        }
429    }
430}
431
432impl<S: SeqStream> SeqStream for SeqStreamWithRevComp<S> {
433    // If the original sequence stream is x1,x2,x3..., returns sequences in the order
434    // x1, rc(x1), x2, rc(x2), x3, rc(x3)...
435    fn read_next(&mut self) -> Result<Option<RefRecord>, Box<dyn std::error::Error>> {
436        self.parity = !self.parity;
437
438        if self.parity {
439            let new = match self.inner.read_next()? {
440                None => return Ok(None), // End of stream
441                Some(r) => r
442            };
443
444            // Copy the record to self storage for reverse complementation later
445            self.rec.head.clear();
446            self.rec.head.extend(new.head);
447            self.rec.seq.clear();
448            self.rec.seq.extend(new.seq);
449            if let Some(q) = new.qual {
450                if self.rec.qual.is_none() {
451                    self.rec.qual = Some(Vec::<u8>::new());
452                }
453                self.rec.qual.as_mut().unwrap().clear();
454                self.rec.qual.as_mut().unwrap().extend(q);
455            }
456
457            return Ok(Some(self.rec.as_ref_record()));
458
459        } else {
460            crate::reverse_complement_in_place(&mut self.rec.seq);
461            return Ok(Some(self.rec.as_ref_record()));
462        }
463    }
464}