1use core::marker::{PhantomData, Unpin};
2
3use std::io::BufRead;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::Stream;
8
9use bio_seq::prelude::*;
10
11use crate::{FastxError, Reader, Record};
12
13pub struct Fasta<R: BufRead, T = Seq<Dna>>
14where
15 T: for<'a> TryFrom<&'a [u8]>,
16{
17 reader: Pin<Box<R>>,
18 line_buf: Vec<u8>,
19 field_buf: Option<Vec<u8>>,
20 p: PhantomData<T>,
21}
22
23fn end_pos(line_buf: &[u8]) -> usize {
24 if line_buf.ends_with(b"\r\n") {
25 line_buf.len() - 2
26 } else if line_buf.ends_with(b"\n") {
27 line_buf.len() - 1
28 } else {
29 line_buf.len()
30 }
31}
32
33impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Unpin for Fasta<R, T> {}
34
35impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fasta<R, T> {
36 pub fn new(reader: R) -> Self {
37 Fasta {
38 reader: Box::pin(reader),
39 line_buf: Vec::<u8>::with_capacity(256),
40 field_buf: None,
41 p: PhantomData,
42 }
43 }
44
45 fn parse_record(&mut self) -> Option<Result<Record<T>, FastxError>> {
46 let reader = Pin::get_mut(self.reader.as_mut());
47
48 let mut seq_buf: Vec<u8> = Vec::new();
49 let fields: Vec<u8> = if self.field_buf.is_none() {
51 self.line_buf.clear();
52 if let Ok(size) = reader.read_until(b'\n', &mut self.line_buf) {
53 if size == 0 {
54 return None;
56 }
57
58 if self.line_buf[0] == b'>' {
59 Vec::from(&self.line_buf[1..end_pos(&self.line_buf)])
61 } else {
62 return Some(Err(FastxError::InvalidId(
63 String::from_utf8_lossy(&self.line_buf).into_owned(),
64 )));
65 }
66 } else {
67 return Some(Err(FastxError::TruncatedRecord));
69 }
70 } else {
71 self.field_buf.take().unwrap()
72 };
73
74 self.line_buf.clear();
76 while let Ok(size) = reader.read_until(b'\n', &mut self.line_buf) {
77 if size == 0 {
78 break;
80 }
81 if self.line_buf[0] == b'>' {
82 self.field_buf = Some(Vec::from(&self.line_buf[1..end_pos(&self.line_buf)]));
84 break;
85 } else {
86 seq_buf.extend_from_slice(&self.line_buf[..end_pos(&self.line_buf)]);
88 self.line_buf.clear();
89 }
90 }
91
92 let seq = match T::try_from(&seq_buf) {
93 Ok(s) => s,
94 Err(_) => {
95 return Some(Err(FastxError::InvalidSequence("TODO".to_string())));
96 }
97 };
98 Some(Ok(Record {
99 fields,
100 seq,
101 quality: None,
102 }))
103 }
104}
105
106impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Iterator for Fasta<R, T> {
107 type Item = Result<Record<T>, FastxError>;
108
109 fn next(&mut self) -> Option<Self::Item> {
110 self.parse_record()
111 }
112}
113
114impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fasta<R, T> {
115 type Item = Result<Record<T>, FastxError>;
116
117 fn poll_next(
118 self: Pin<&mut Self>,
119 _cx: &mut Context<'_>,
120 ) -> Poll<Option<<Self as Stream>::Item>> {
121 let record = unsafe { self.get_unchecked_mut().parse_record() };
122
123 Poll::Ready(record)
124 }
125}
126
127impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Reader<T> for Fasta<R, T> {}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use futures_test::task::noop_waker;
133 use std::io::Cursor;
134 use std::task::{Context, Poll};
135
136 #[test]
137 fn test_fasta_iterator() {
138 let data = b">SEQ_ID_1
139ACTCGATCGCGACG
140ACACGATCGCGCGC
141CATCGACTACGGCG
142>SEQ_ID_2
143GGGGGGGGGGGGGG\n";
144 let reader = Cursor::new(data as &[u8]);
145 let mut fasta: Fasta<Cursor<&[u8]>, Seq<Dna>> = Fasta::new(reader);
146
147 let record1 = fasta.next().unwrap().unwrap();
148 assert_eq!(record1.fields, b"SEQ_ID_1".to_vec());
149 assert_eq!(
150 record1.seq,
151 dna!("ACTCGATCGCGACGACACGATCGCGCGCCATCGACTACGGCG")
152 );
153
154 let record2 = fasta
155 .next()
156 .expect("Expected a record")
157 .expect("Expected valid record");
158 assert_eq!(record2.fields, b"SEQ_ID_2".to_vec());
159 assert_eq!(record2.seq, dna!("GGGGGGGGGGGGGG"));
160
161 assert!(fasta.next().is_none(), "Expected no more records");
162 }
163
164 #[test]
165 fn test_fasta_poll_next() {
166 let data = b">SEQ_ID_1
167AAAAAAAAAAAAAA
168CCCCCCCCCCCCC
169GGGGGGGGGGGG
170>SEQ_ID_2
171TTTTTTTTTTT\n";
172
173 let reader = Cursor::new(data as &[u8]);
174 let mut fasta: Pin<Box<Fasta<Cursor<&[u8]>, Seq<Dna>>>> =
175 Pin::new(Box::new(Fasta::new(reader)));
176
177 let waker = noop_waker();
178 let mut cx = Context::from_waker(&waker);
179
180 match fasta.as_mut().poll_next(&mut cx) {
182 Poll::Ready(Some(Ok(record))) => {
183 assert_eq!(record.fields, b"SEQ_ID_1".to_vec());
184 assert_eq!(record.seq, dna!("AAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGG"));
185 }
186 _ => panic!("Unexpected result"),
187 }
188
189 match fasta.as_mut().poll_next(&mut cx) {
190 Poll::Ready(Some(Ok(record))) => {
191 assert_eq!(record.fields, b"SEQ_ID_2".to_vec());
192 assert_eq!(record.seq, dna!("TTTTTTTTTTT"));
193 }
194 e => panic!("Unexpected result {:?}", e),
195 }
196
197 assert_eq!(fasta.as_mut().poll_next(&mut cx), Poll::Ready(None));
198 }
199
200 #[test]
201 fn test_fasta_poll_next_with_crlf() {
202 let data = b">SEQ_ID_1\r\nAAAAAAAAAAAAAA\r\nCCCCCCCCCCCCC\r\nGGGGGGGGGGGG\r\n>SEQ_ID_2\r\nTTTTTTTTTTT";
203
204 let reader = Cursor::new(data as &[u8]);
205 let mut fasta: Pin<Box<Fasta<Cursor<&[u8]>, Seq<Dna>>>> =
206 Pin::new(Box::new(Fasta::new(reader)));
207
208 let waker = noop_waker();
209 let mut cx = Context::from_waker(&waker);
210
211 match fasta.as_mut().poll_next(&mut cx) {
213 Poll::Ready(Some(Ok(record))) => {
214 assert_eq!(record.fields, b"SEQ_ID_1".to_vec());
215 assert_eq!(record.seq, dna!("AAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGG"));
216 }
217 _ => panic!("Unexpected result"),
218 }
219
220 match fasta.as_mut().poll_next(&mut cx) {
221 Poll::Ready(Some(Ok(record))) => {
222 assert_eq!(record.fields, b"SEQ_ID_2".to_vec());
223 assert_eq!(record.seq, dna!("TTTTTTTTTTT"));
224 }
225 e => panic!("Unexpected result {:?}", e),
226 }
227
228 assert_eq!(fasta.as_mut().poll_next(&mut cx), Poll::Ready(None));
229 }
230
231 #[test]
232 fn test_fasta_poll_next_no_eol() {
233 let data = b">SEQ_ID_X\nAAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGGACGTAAA";
234
235 let reader = Cursor::new(data as &[u8]);
236 let mut fasta: Pin<Box<Fasta<Cursor<&[u8]>, Seq<Dna>>>> =
237 Pin::new(Box::new(Fasta::new(reader)));
238
239 let waker = noop_waker();
240 let mut cx = Context::from_waker(&waker);
241
242 match fasta.as_mut().poll_next(&mut cx) {
244 Poll::Ready(Some(Ok(record))) => {
245 assert_eq!(record.fields, b"SEQ_ID_X".to_vec());
246 assert_eq!(
247 record.seq,
248 dna!("AAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGGACGTAAA")
249 );
250 }
251 _ => panic!("Unexpected result"),
252 }
253
254 assert_eq!(fasta.as_mut().poll_next(&mut cx), Poll::Ready(None));
255 }
256}