bio_streams/
fastq.rs

1use core::marker::{PhantomData, Unpin};
2
3use std::io::BufRead;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::Stream;
8
9use bio_seq::prelude::*;
10
11use crate::record::Phred;
12use crate::{FastxError, Reader, Record};
13
14pub struct Fastq<R: BufRead, T = Seq<Dna>>
15where
16    T: for<'a> TryFrom<&'a [u8]>,
17{
18    reader: Pin<Box<R>>,
19    id_buf: Vec<u8>,
20    seq_buf: Vec<u8>,
21    sep_buf: Vec<u8>,
22    qual_buf: Vec<u8>,
23    p: PhantomData<T>,
24}
25
26impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Unpin for Fastq<R, T> {}
27
28impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fastq<R, T> {
29    pub fn new(reader: R) -> Self {
30        Fastq {
31            reader: Box::pin(reader),
32            id_buf: Vec::<u8>::with_capacity(256),
33            seq_buf: Vec::<u8>::with_capacity(512),
34            sep_buf: Vec::<u8>::with_capacity(4),
35            qual_buf: Vec::<u8>::with_capacity(512),
36            p: PhantomData,
37        }
38    }
39
40    fn parse_record(&mut self) -> Option<Result<Record<T>, FastxError>> {
41        let mut quality = Vec::<Phred>::new();
42        let reader = Pin::get_mut(self.reader.as_mut());
43
44        self.id_buf.clear();
45        self.seq_buf.clear();
46        self.sep_buf.clear();
47        self.qual_buf.clear();
48
49        if reader.read_until(b'\n', &mut self.id_buf).is_err() {
50            return Some(Err(FastxError::FileError));
51        }
52        if self.id_buf.is_empty() {
53            // This is the only condition where an empty reader means
54            // that the file has successfully finished reading
55            return None;
56        }
57        // The id line must begin with '@'
58        if self.id_buf[0] != b'@' {
59            return Some(Err(FastxError::InvalidId(
60                String::from_utf8_lossy(&self.id_buf).into_owned(),
61            )));
62        }
63
64        if reader.read_until(b'\n', &mut self.seq_buf).is_err() {
65            return Some(Err(FastxError::FileError));
66        }
67        if self.seq_buf.is_empty() {
68            return Some(Err(FastxError::TruncatedRecord));
69        }
70
71        if reader.read_until(b'\n', &mut self.sep_buf).is_err() {
72            return Some(Err(FastxError::FileError));
73        }
74        if self.sep_buf.is_empty() {
75            return Some(Err(FastxError::TruncatedRecord));
76        }
77
78        // Detect whether the '+' separation line is valid
79        if self.sep_buf.len() != 2 || self.sep_buf[0] != b'+' {
80            return Some(Err(FastxError::InvalidSeparationLine));
81        }
82        if reader.read_until(b'\n', &mut self.qual_buf).is_err() {
83            return Some(Err(FastxError::FileError));
84        }
85        if self.qual_buf.is_empty() {
86            return Some(Err(FastxError::TruncatedRecord));
87        }
88
89        // Parse the contents of the sequence and quality lines
90        if self.qual_buf.len() != self.seq_buf.len() {
91            return Some(Err(FastxError::InvalidQuality));
92        }
93
94        let seq = match T::try_from(&self.seq_buf[..self.seq_buf.len() - 1]) {
95            Ok(parsed_seq) => parsed_seq,
96            Err(_) => {
97                return Some(Err(FastxError::InvalidSequence(
98                    String::from_utf8_lossy(&self.seq_buf).into_owned(),
99                )))
100            }
101        };
102
103        quality.extend(
104            self.qual_buf[..self.qual_buf.len() - 1]
105                .iter()
106                .map(|q| Phred::from(*q)),
107        );
108
109        Some(Ok(Record {
110            fields: Vec::<u8>::from(&self.id_buf[1..self.id_buf.len() - 1]),
111            seq,
112            quality: Some(quality),
113        }))
114    }
115}
116
117impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Iterator for Fastq<R, T> {
118    type Item = Result<Record<T>, FastxError>;
119
120    fn next(&mut self) -> Option<Self::Item> {
121        self.parse_record()
122    }
123}
124
125impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fastq<R, T> {
126    type Item = Result<Record<T>, FastxError>;
127
128    fn poll_next(
129        self: Pin<&mut Self>,
130        _cx: &mut Context<'_>,
131    ) -> Poll<Option<<Self as Stream>::Item>> {
132        let record = unsafe { self.get_unchecked_mut().parse_record() };
133
134        Poll::Ready(record)
135    }
136}
137
138impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Reader<T> for Fastq<R, T> {}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use futures_test::task::noop_waker;
144    use std::io::Cursor;
145    use std::task::{Context, Poll};
146
147    #[test]
148    fn test_fastq_iterator() {
149        let data = b"@SEQ_ID_1
150ACTCGATCGCGACG
151+
152FFFFFFFFFFFFFF
153@SEQ_ID_2
154CATCGACTACGGCG
155+
156GGGGGGGGGGGGGG\n";
157        let reader = Cursor::new(data as &[u8]);
158        let mut fastq: Fastq<Cursor<&[u8]>, Seq<Dna>> = Fastq::new(reader);
159
160        let record1 = fastq.next().unwrap().unwrap();
161        assert_eq!(record1.fields, b"SEQ_ID_1".to_vec());
162        assert_eq!(record1.seq, dna!("ACTCGATCGCGACG"));
163        //assert_eq!(record1.quality, b"FFFFFFFFFFFFFF");
164
165        let record2 = fastq
166            .next()
167            .expect("Expected a record")
168            .expect("Expected valid record");
169        assert_eq!(record2.fields, b"SEQ_ID_2".to_vec());
170        assert_eq!(record2.seq, dna!("CATCGACTACGGCG"));
171
172        assert!(fastq.next().is_none(), "Expected no more records");
173    }
174
175    #[test]
176    fn test_fastq_poll_next() {
177        let data = b"@SEQ_ID_1
178ACTCGATCGCGACG
179+
180FFFFFFFFFFFFFF
181@SEQ_ID_2
182CATCGACTACGGCG
183+
184GGGGGGGGGGGGGG\n";
185
186        let reader = Cursor::new(data as &[u8]);
187        let mut fastq: Pin<Box<Fastq<Cursor<&[u8]>, Seq<Dna>>>> =
188            Pin::new(Box::new(Fastq::new(reader)));
189
190        let waker = noop_waker();
191        let mut cx = Context::from_waker(&waker);
192
193        // Manual polling using poll_next
194        match fastq.as_mut().poll_next(&mut cx) {
195            Poll::Ready(Some(Ok(record))) => {
196                assert_eq!(record.fields, b"SEQ_ID_1".to_vec());
197                assert_eq!(record.seq, dna!("ACTCGATCGCGACG"));
198                //assert_eq!(record.quality, "FFFFFFFFFFFFFF");
199            }
200            _ => panic!("Unexpected result"),
201        }
202
203        match fastq.as_mut().poll_next(&mut cx) {
204            Poll::Ready(Some(Ok(record))) => {
205                assert_eq!(record.fields, b"SEQ_ID_2".to_vec());
206                assert_eq!(record.seq, dna!("CATCGACTACGGCG"));
207            }
208            _ => panic!("Unexpected result"),
209        }
210
211        assert_eq!(fastq.as_mut().poll_next(&mut cx), Poll::Ready(None));
212    }
213}