bio_streams/
fasta.rs

1use core::marker::{PhantomData, Unpin};
2
3use std::io::BufRead;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::Stream;
8
9use bio_seq::prelude::*;
10
11use crate::{FastxError, Reader, Record};
12
13pub struct Fasta<R: BufRead, T = Seq<Dna>>
14where
15    T: for<'a> TryFrom<&'a [u8]>,
16{
17    reader: Pin<Box<R>>,
18    line_buf: Vec<u8>,
19    field_buf: Option<Vec<u8>>,
20    p: PhantomData<T>,
21}
22
23fn end_pos(line_buf: &[u8]) -> usize {
24    if line_buf.ends_with(b"\r\n") {
25        line_buf.len() - 2
26    } else if line_buf.ends_with(b"\n") {
27        line_buf.len() - 1
28    } else {
29        line_buf.len()
30    }
31}
32
33impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Unpin for Fasta<R, T> {}
34
35impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fasta<R, T> {
36    pub fn new(reader: R) -> Self {
37        Fasta {
38            reader: Box::pin(reader),
39            line_buf: Vec::<u8>::with_capacity(256),
40            field_buf: None,
41            p: PhantomData,
42        }
43    }
44
45    fn parse_record(&mut self) -> Option<Result<Record<T>, FastxError>> {
46        let reader = Pin::get_mut(self.reader.as_mut());
47
48        let mut seq_buf: Vec<u8> = Vec::new();
49        // if field_buf is None, first line should start with '>'
50        let fields: Vec<u8> = if self.field_buf.is_none() {
51            self.line_buf.clear();
52            if let Ok(size) = reader.read_until(b'\n', &mut self.line_buf) {
53                if size == 0 {
54                    // end of stream before a record (empty stream)
55                    return None;
56                }
57
58                if self.line_buf[0] == b'>' {
59                    //                    let end = end_pos(&self.line_buf);
60                    Vec::from(&self.line_buf[1..end_pos(&self.line_buf)])
61                } else {
62                    return Some(Err(FastxError::InvalidId(
63                        String::from_utf8_lossy(&self.line_buf).into_owned(),
64                    )));
65                }
66            } else {
67                // premature end of fasta?
68                return Some(Err(FastxError::TruncatedRecord));
69            }
70        } else {
71            self.field_buf.take().unwrap()
72        };
73
74        // Read the next non-'>' lines into the sequence buffer
75        self.line_buf.clear();
76        while let Ok(size) = reader.read_until(b'\n', &mut self.line_buf) {
77            if size == 0 {
78                // end of stream
79                break;
80            }
81            if self.line_buf[0] == b'>' {
82                // new record starts
83                self.field_buf = Some(Vec::from(&self.line_buf[1..end_pos(&self.line_buf)]));
84                break;
85            } else {
86                // treat this line as sequence
87                seq_buf.extend_from_slice(&self.line_buf[..end_pos(&self.line_buf)]);
88                self.line_buf.clear();
89            }
90        }
91
92        let seq = match T::try_from(&seq_buf) {
93            Ok(s) => s,
94            Err(_) => {
95                return Some(Err(FastxError::InvalidSequence("TODO".to_string())));
96            }
97        };
98        Some(Ok(Record {
99            fields,
100            seq,
101            quality: None,
102        }))
103    }
104}
105
106impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Iterator for Fasta<R, T> {
107    type Item = Result<Record<T>, FastxError>;
108
109    fn next(&mut self) -> Option<Self::Item> {
110        self.parse_record()
111    }
112}
113
114impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fasta<R, T> {
115    type Item = Result<Record<T>, FastxError>;
116
117    fn poll_next(
118        self: Pin<&mut Self>,
119        _cx: &mut Context<'_>,
120    ) -> Poll<Option<<Self as Stream>::Item>> {
121        let record = unsafe { self.get_unchecked_mut().parse_record() };
122
123        Poll::Ready(record)
124    }
125}
126
127impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Reader<T> for Fasta<R, T> {}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use futures_test::task::noop_waker;
133    use std::io::Cursor;
134    use std::task::{Context, Poll};
135
136    #[test]
137    fn test_fasta_iterator() {
138        let data = b">SEQ_ID_1
139ACTCGATCGCGACG
140ACACGATCGCGCGC
141CATCGACTACGGCG
142>SEQ_ID_2
143GGGGGGGGGGGGGG\n";
144        let reader = Cursor::new(data as &[u8]);
145        let mut fasta: Fasta<Cursor<&[u8]>, Seq<Dna>> = Fasta::new(reader);
146
147        let record1 = fasta.next().unwrap().unwrap();
148        assert_eq!(record1.fields, b"SEQ_ID_1".to_vec());
149        assert_eq!(
150            record1.seq,
151            dna!("ACTCGATCGCGACGACACGATCGCGCGCCATCGACTACGGCG")
152        );
153
154        let record2 = fasta
155            .next()
156            .expect("Expected a record")
157            .expect("Expected valid record");
158        assert_eq!(record2.fields, b"SEQ_ID_2".to_vec());
159        assert_eq!(record2.seq, dna!("GGGGGGGGGGGGGG"));
160
161        assert!(fasta.next().is_none(), "Expected no more records");
162    }
163
164    #[test]
165    fn test_fasta_poll_next() {
166        let data = b">SEQ_ID_1
167AAAAAAAAAAAAAA
168CCCCCCCCCCCCC
169GGGGGGGGGGGG
170>SEQ_ID_2
171TTTTTTTTTTT\n";
172
173        let reader = Cursor::new(data as &[u8]);
174        let mut fasta: Pin<Box<Fasta<Cursor<&[u8]>, Seq<Dna>>>> =
175            Pin::new(Box::new(Fasta::new(reader)));
176
177        let waker = noop_waker();
178        let mut cx = Context::from_waker(&waker);
179
180        // Manual polling using poll_next
181        match fasta.as_mut().poll_next(&mut cx) {
182            Poll::Ready(Some(Ok(record))) => {
183                assert_eq!(record.fields, b"SEQ_ID_1".to_vec());
184                assert_eq!(record.seq, dna!("AAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGG"));
185            }
186            _ => panic!("Unexpected result"),
187        }
188
189        match fasta.as_mut().poll_next(&mut cx) {
190            Poll::Ready(Some(Ok(record))) => {
191                assert_eq!(record.fields, b"SEQ_ID_2".to_vec());
192                assert_eq!(record.seq, dna!("TTTTTTTTTTT"));
193            }
194            e => panic!("Unexpected result {:?}", e),
195        }
196
197        assert_eq!(fasta.as_mut().poll_next(&mut cx), Poll::Ready(None));
198    }
199
200    #[test]
201    fn test_fasta_poll_next_with_crlf() {
202        let data = b">SEQ_ID_1\r\nAAAAAAAAAAAAAA\r\nCCCCCCCCCCCCC\r\nGGGGGGGGGGGG\r\n>SEQ_ID_2\r\nTTTTTTTTTTT";
203
204        let reader = Cursor::new(data as &[u8]);
205        let mut fasta: Pin<Box<Fasta<Cursor<&[u8]>, Seq<Dna>>>> =
206            Pin::new(Box::new(Fasta::new(reader)));
207
208        let waker = noop_waker();
209        let mut cx = Context::from_waker(&waker);
210
211        // Manual polling using poll_next
212        match fasta.as_mut().poll_next(&mut cx) {
213            Poll::Ready(Some(Ok(record))) => {
214                assert_eq!(record.fields, b"SEQ_ID_1".to_vec());
215                assert_eq!(record.seq, dna!("AAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGG"));
216            }
217            _ => panic!("Unexpected result"),
218        }
219
220        match fasta.as_mut().poll_next(&mut cx) {
221            Poll::Ready(Some(Ok(record))) => {
222                assert_eq!(record.fields, b"SEQ_ID_2".to_vec());
223                assert_eq!(record.seq, dna!("TTTTTTTTTTT"));
224            }
225            e => panic!("Unexpected result {:?}", e),
226        }
227
228        assert_eq!(fasta.as_mut().poll_next(&mut cx), Poll::Ready(None));
229    }
230
231    #[test]
232    fn test_fasta_poll_next_no_eol() {
233        let data = b">SEQ_ID_X\nAAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGGACGTAAA";
234
235        let reader = Cursor::new(data as &[u8]);
236        let mut fasta: Pin<Box<Fasta<Cursor<&[u8]>, Seq<Dna>>>> =
237            Pin::new(Box::new(Fasta::new(reader)));
238
239        let waker = noop_waker();
240        let mut cx = Context::from_waker(&waker);
241
242        // Manual polling using poll_next
243        match fasta.as_mut().poll_next(&mut cx) {
244            Poll::Ready(Some(Ok(record))) => {
245                assert_eq!(record.fields, b"SEQ_ID_X".to_vec());
246                assert_eq!(
247                    record.seq,
248                    dna!("AAAAAAAAAAAAAACCCCCCCCCCCCCGGGGGGGGGGGGACGTAAA")
249                );
250            }
251            _ => panic!("Unexpected result"),
252        }
253
254        assert_eq!(fasta.as_mut().poll_next(&mut cx), Poll::Ready(None));
255    }
256}