1use core::marker::{PhantomData, Unpin};
2
3use std::io::BufRead;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::Stream;
8
9use bio_seq::prelude::*;
10
11use crate::record::Phred;
12use crate::{FastxError, Reader, Record};
13
14pub struct Fastq<R: BufRead, T = Seq<Dna>>
15where
16 T: for<'a> TryFrom<&'a [u8]>,
17{
18 reader: Pin<Box<R>>,
19 id_buf: Vec<u8>,
20 seq_buf: Vec<u8>,
21 sep_buf: Vec<u8>,
22 qual_buf: Vec<u8>,
23 p: PhantomData<T>,
24}
25
26impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Unpin for Fastq<R, T> {}
27
28impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fastq<R, T> {
29 pub fn new(reader: R) -> Self {
30 Fastq {
31 reader: Box::pin(reader),
32 id_buf: Vec::<u8>::with_capacity(256),
33 seq_buf: Vec::<u8>::with_capacity(512),
34 sep_buf: Vec::<u8>::with_capacity(4),
35 qual_buf: Vec::<u8>::with_capacity(512),
36 p: PhantomData,
37 }
38 }
39
40 fn parse_record(&mut self) -> Option<Result<Record<T>, FastxError>> {
41 let mut quality = Vec::<Phred>::new();
42 let reader = Pin::get_mut(self.reader.as_mut());
43
44 self.id_buf.clear();
45 self.seq_buf.clear();
46 self.sep_buf.clear();
47 self.qual_buf.clear();
48
49 if reader.read_until(b'\n', &mut self.id_buf).is_err() {
50 return Some(Err(FastxError::FileError));
51 }
52 if self.id_buf.is_empty() {
53 return None;
56 }
57 if self.id_buf[0] != b'@' {
59 return Some(Err(FastxError::InvalidId(
60 String::from_utf8_lossy(&self.id_buf).into_owned(),
61 )));
62 }
63
64 if reader.read_until(b'\n', &mut self.seq_buf).is_err() {
65 return Some(Err(FastxError::FileError));
66 }
67 if self.seq_buf.is_empty() {
68 return Some(Err(FastxError::TruncatedRecord));
69 }
70
71 if reader.read_until(b'\n', &mut self.sep_buf).is_err() {
72 return Some(Err(FastxError::FileError));
73 }
74 if self.sep_buf.is_empty() {
75 return Some(Err(FastxError::TruncatedRecord));
76 }
77
78 if self.sep_buf.len() != 2 || self.sep_buf[0] != b'+' {
80 return Some(Err(FastxError::InvalidSeparationLine));
81 }
82 if reader.read_until(b'\n', &mut self.qual_buf).is_err() {
83 return Some(Err(FastxError::FileError));
84 }
85 if self.qual_buf.is_empty() {
86 return Some(Err(FastxError::TruncatedRecord));
87 }
88
89 if self.qual_buf.len() != self.seq_buf.len() {
91 return Some(Err(FastxError::InvalidQuality));
92 }
93
94 let seq = match T::try_from(&self.seq_buf[..self.seq_buf.len() - 1]) {
95 Ok(parsed_seq) => parsed_seq,
96 Err(_) => {
97 return Some(Err(FastxError::InvalidSequence(
98 String::from_utf8_lossy(&self.seq_buf).into_owned(),
99 )))
100 }
101 };
102
103 quality.extend(
104 self.qual_buf[..self.qual_buf.len() - 1]
105 .iter()
106 .map(|q| Phred::from(*q)),
107 );
108
109 Some(Ok(Record {
110 fields: Vec::<u8>::from(&self.id_buf[1..self.id_buf.len() - 1]),
111 seq,
112 quality: Some(quality),
113 }))
114 }
115}
116
117impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Iterator for Fastq<R, T> {
118 type Item = Result<Record<T>, FastxError>;
119
120 fn next(&mut self) -> Option<Self::Item> {
121 self.parse_record()
122 }
123}
124
125impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fastq<R, T> {
126 type Item = Result<Record<T>, FastxError>;
127
128 fn poll_next(
129 self: Pin<&mut Self>,
130 _cx: &mut Context<'_>,
131 ) -> Poll<Option<<Self as Stream>::Item>> {
132 let record = unsafe { self.get_unchecked_mut().parse_record() };
133
134 Poll::Ready(record)
135 }
136}
137
138impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Reader<T> for Fastq<R, T> {}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use futures_test::task::noop_waker;
144 use std::io::Cursor;
145 use std::task::{Context, Poll};
146
147 #[test]
148 fn test_fastq_iterator() {
149 let data = b"@SEQ_ID_1
150ACTCGATCGCGACG
151+
152FFFFFFFFFFFFFF
153@SEQ_ID_2
154CATCGACTACGGCG
155+
156GGGGGGGGGGGGGG\n";
157 let reader = Cursor::new(data as &[u8]);
158 let mut fastq: Fastq<Cursor<&[u8]>, Seq<Dna>> = Fastq::new(reader);
159
160 let record1 = fastq.next().unwrap().unwrap();
161 assert_eq!(record1.fields, b"SEQ_ID_1".to_vec());
162 assert_eq!(record1.seq, dna!("ACTCGATCGCGACG"));
163 let record2 = fastq
166 .next()
167 .expect("Expected a record")
168 .expect("Expected valid record");
169 assert_eq!(record2.fields, b"SEQ_ID_2".to_vec());
170 assert_eq!(record2.seq, dna!("CATCGACTACGGCG"));
171
172 assert!(fastq.next().is_none(), "Expected no more records");
173 }
174
175 #[test]
176 fn test_fastq_poll_next() {
177 let data = b"@SEQ_ID_1
178ACTCGATCGCGACG
179+
180FFFFFFFFFFFFFF
181@SEQ_ID_2
182CATCGACTACGGCG
183+
184GGGGGGGGGGGGGG\n";
185
186 let reader = Cursor::new(data as &[u8]);
187 let mut fastq: Pin<Box<Fastq<Cursor<&[u8]>, Seq<Dna>>>> =
188 Pin::new(Box::new(Fastq::new(reader)));
189
190 let waker = noop_waker();
191 let mut cx = Context::from_waker(&waker);
192
193 match fastq.as_mut().poll_next(&mut cx) {
195 Poll::Ready(Some(Ok(record))) => {
196 assert_eq!(record.fields, b"SEQ_ID_1".to_vec());
197 assert_eq!(record.seq, dna!("ACTCGATCGCGACG"));
198 }
200 _ => panic!("Unexpected result"),
201 }
202
203 match fastq.as_mut().poll_next(&mut cx) {
204 Poll::Ready(Some(Ok(record))) => {
205 assert_eq!(record.fields, b"SEQ_ID_2".to_vec());
206 assert_eq!(record.seq, dna!("CATCGACTACGGCG"));
207 }
208 _ => panic!("Unexpected result"),
209 }
210
211 assert_eq!(fastq.as_mut().poll_next(&mut cx), Poll::Ready(None));
212 }
213}