seq_events/
fastq.rs

1use std::io::{BufRead, BufReader, Read};
2
3use memchr::{memchr, memchr2, memchr3};
4
5use crate::error::ReaderError;
6use crate::event::Event;
7
8const DEFAULT_BUFFER_SIZE: usize = 128 * 1024;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11enum State {
12    Start,
13    Id,
14    Sequence,
15    Plus,
16    Quality,
17}
18
19/// Zero-copy streaming FASTQ parser.
20pub struct FastqReader<R> {
21    reader: BufReader<R>,
22    pending_consume: usize,
23    state: State,
24    seq_len: usize,
25    qual_len: usize,
26    first_record: bool,
27}
28
29impl<R: Read> FastqReader<R> {
30    /// Creates a reader with default 128 KiB buffer.
31    pub fn new(reader: R) -> Self {
32        Self::with_capacity(DEFAULT_BUFFER_SIZE, reader)
33    }
34
35    /// Creates a reader with specified buffer capacity.
36    pub fn with_capacity(capacity: usize, reader: R) -> Self {
37        Self {
38            reader: BufReader::with_capacity(capacity, reader),
39            pending_consume: 0,
40            state: State::Start,
41            seq_len: 0,
42            qual_len: 0,
43            first_record: true,
44        }
45    }
46
47    /// Returns the next event, or `None` at EOF.
48    pub fn next_event(&mut self) -> Option<Result<Event<'_>, ReaderError>> {
49        loop {
50            if self.pending_consume > 0 {
51                self.reader.consume(self.pending_consume);
52                self.pending_consume = 0;
53            }
54
55            let buf = match self.reader.fill_buf() {
56                Ok(b) if b.is_empty() => return None,
57                Ok(b) => b,
58                Err(e) => return Some(Err(e.into())),
59            };
60
61            let buf_ptr = buf.as_ptr();
62            let buf_len = buf.len();
63
64            match self.state {
65                State::Start => {
66                    let first_non_ws = buf.iter().position(|&b| b != b'\n' && b != b'\r');
67
68                    match first_non_ws {
69                        Some(0) => {
70                            if buf[0] == b'@' {
71                                let is_first = self.first_record;
72                                self.first_record = false;
73                                self.state = State::Id;
74                                self.pending_consume = 1;
75                                self.seq_len = 0;
76                                self.qual_len = 0;
77                                if is_first {
78                                    continue; // First record - no event
79                                }
80                                return Some(Ok(Event::NextRecord));
81                            } else {
82                                return Some(Err(ReaderError::InvalidFormat {
83                                    message: format!(
84                                        "Expected '@' at start of FASTQ record, found '{}'",
85                                        buf[0] as char
86                                    ),
87                                }));
88                            }
89                        }
90                        Some(pos) => {
91                            self.pending_consume = pos;
92                            continue;
93                        }
94                        None => {
95                            self.pending_consume = buf_len;
96                            continue;
97                        }
98                    }
99                }
100
101                State::Id => {
102                    if let Some(newline_pos) = memchr(b'\n', buf) {
103                        let end = if newline_pos > 0 && buf[newline_pos - 1] == b'\r' {
104                            newline_pos - 1
105                        } else {
106                            newline_pos
107                        };
108
109                        self.state = State::Sequence;
110                        self.pending_consume = newline_pos + 1;
111
112                        if end > 0 {
113                            let slice = unsafe { std::slice::from_raw_parts(buf_ptr, end) };
114                            return Some(Ok(Event::IdChunk(slice)));
115                        } else {
116                            continue;
117                        }
118                    } else {
119                        self.pending_consume = buf_len;
120                        let slice = unsafe { std::slice::from_raw_parts(buf_ptr, buf_len) };
121                        return Some(Ok(Event::IdChunk(slice)));
122                    }
123                }
124
125                State::Sequence => {
126                    if buf[0] == b'+' {
127                        if let Some(newline_pos) = memchr(b'\n', buf) {
128                            self.pending_consume = newline_pos + 1;
129                        } else {
130                            self.pending_consume = buf_len;
131                        }
132                        self.state = State::Plus;
133                        continue;
134                    }
135
136                    if buf[0] == b'\n' {
137                        self.pending_consume = 1;
138                        continue;
139                    }
140                    if buf[0] == b'\r' {
141                        self.pending_consume = if buf_len > 1 && buf[1] == b'\n' { 2 } else { 1 };
142                        continue;
143                    }
144
145                    let chunk_end = memchr3(b'\n', b'\r', b'+', buf).unwrap_or(buf_len);
146
147                    if chunk_end == 0 {
148                        self.pending_consume = 1;
149                        continue;
150                    }
151
152                    self.pending_consume = chunk_end;
153                    self.seq_len += chunk_end;
154                    let slice = unsafe { std::slice::from_raw_parts(buf_ptr, chunk_end) };
155                    return Some(Ok(Event::SeqChunk(slice)));
156                }
157
158                State::Plus => {
159                    if buf[0] == b'\n' {
160                        self.pending_consume = 1;
161                        self.state = State::Quality;
162                        continue;
163                    }
164                    if buf[0] == b'\r' {
165                        self.pending_consume = if buf_len > 1 && buf[1] == b'\n' { 2 } else { 1 };
166                        self.state = State::Quality;
167                        continue;
168                    }
169                    self.state = State::Quality;
170                    continue;
171                }
172
173                State::Quality => {
174                    if buf[0] == b'\n' {
175                        self.pending_consume = 1;
176                        continue;
177                    }
178                    if buf[0] == b'\r' {
179                        self.pending_consume = if buf_len > 1 && buf[1] == b'\n' { 2 } else { 1 };
180                        continue;
181                    }
182
183                    let remaining = self.seq_len.saturating_sub(self.qual_len);
184
185                    if remaining == 0 {
186                        self.state = State::Start;
187                        continue;
188                    }
189
190                    let newline_pos = memchr2(b'\n', b'\r', buf);
191                    let chunk_end = match newline_pos {
192                        Some(pos) => pos.min(remaining),
193                        None => buf_len.min(remaining),
194                    };
195
196                    if chunk_end == 0 {
197                        self.pending_consume = 1;
198                        continue;
199                    }
200
201                    self.pending_consume = chunk_end;
202                    self.qual_len += chunk_end;
203                    let slice = unsafe { std::slice::from_raw_parts(buf_ptr, chunk_end) };
204
205                    if self.qual_len >= self.seq_len {
206                        self.state = State::Start;
207                    }
208
209                    return Some(Ok(Event::QualChunk(slice)));
210                }
211            }
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use std::io::Cursor;
220
221    #[test]
222    fn test_single_record() {
223        let data = b"@read1 description\nACGT\n+\nIIII\n";
224        let mut reader = FastqReader::new(Cursor::new(&data[..]));
225
226        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read1 description"));
227        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"ACGT"));
228        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"IIII"));
229        assert!(reader.next_event().is_none());
230    }
231
232    #[test]
233    fn test_multiple_records() {
234        let data = b"@read1\nACGT\n+\nIIII\n@read2\nTGCA\n+\nHHHH\n";
235        let mut reader = FastqReader::new(Cursor::new(&data[..]));
236
237        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read1"));
238        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"ACGT"));
239        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"IIII"));
240        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::NextRecord));
241        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read2"));
242        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"TGCA"));
243        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"HHHH"));
244        assert!(reader.next_event().is_none());
245    }
246
247    #[test]
248    fn test_crlf_line_endings() {
249        let data = b"@read1\r\nACGT\r\n+\r\nIIII\r\n";
250        let mut reader = FastqReader::new(Cursor::new(&data[..]));
251
252        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read1"));
253        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"ACGT"));
254        assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"IIII"));
255        assert!(reader.next_event().is_none());
256    }
257
258    #[test]
259    fn test_small_buffer() {
260        let data = b"@read1\nACGTACGT\n+\nIIIIIIII\n";
261        let mut reader = FastqReader::with_capacity(4, Cursor::new(&data[..]));
262
263        let mut id = Vec::new();
264        let mut seq = Vec::new();
265        let mut qual = Vec::new();
266
267        loop {
268            match reader.next_event() {
269                Some(Ok(Event::IdChunk(chunk))) => id.extend_from_slice(chunk),
270                Some(Ok(Event::SeqChunk(chunk))) => seq.extend_from_slice(chunk),
271                Some(Ok(Event::QualChunk(chunk))) => qual.extend_from_slice(chunk),
272                Some(Ok(_)) => panic!("Unexpected event"),
273                Some(Err(e)) => panic!("Error: {}", e),
274                None => break,
275            }
276        }
277
278        assert_eq!(&id, b"read1");
279        assert_eq!(&seq, b"ACGTACGT");
280        assert_eq!(&qual, b"IIIIIIII");
281    }
282}