1use std::io::{BufRead, BufReader, Read};
2
3use memchr::{memchr, memchr2, memchr3};
4
5use crate::error::ReaderError;
6use crate::event::Event;
7
8const DEFAULT_BUFFER_SIZE: usize = 128 * 1024;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11enum State {
12 Start,
13 Id,
14 Sequence,
15 Plus,
16 Quality,
17}
18
19pub struct FastqReader<R> {
21 reader: BufReader<R>,
22 pending_consume: usize,
23 state: State,
24 seq_len: usize,
25 qual_len: usize,
26 first_record: bool,
27}
28
29impl<R: Read> FastqReader<R> {
30 pub fn new(reader: R) -> Self {
32 Self::with_capacity(DEFAULT_BUFFER_SIZE, reader)
33 }
34
35 pub fn with_capacity(capacity: usize, reader: R) -> Self {
37 Self {
38 reader: BufReader::with_capacity(capacity, reader),
39 pending_consume: 0,
40 state: State::Start,
41 seq_len: 0,
42 qual_len: 0,
43 first_record: true,
44 }
45 }
46
47 pub fn next_event(&mut self) -> Option<Result<Event<'_>, ReaderError>> {
49 loop {
50 if self.pending_consume > 0 {
51 self.reader.consume(self.pending_consume);
52 self.pending_consume = 0;
53 }
54
55 let buf = match self.reader.fill_buf() {
56 Ok(b) if b.is_empty() => return None,
57 Ok(b) => b,
58 Err(e) => return Some(Err(e.into())),
59 };
60
61 let buf_ptr = buf.as_ptr();
62 let buf_len = buf.len();
63
64 match self.state {
65 State::Start => {
66 let first_non_ws = buf.iter().position(|&b| b != b'\n' && b != b'\r');
67
68 match first_non_ws {
69 Some(0) => {
70 if buf[0] == b'@' {
71 let is_first = self.first_record;
72 self.first_record = false;
73 self.state = State::Id;
74 self.pending_consume = 1;
75 self.seq_len = 0;
76 self.qual_len = 0;
77 if is_first {
78 continue; }
80 return Some(Ok(Event::NextRecord));
81 } else {
82 return Some(Err(ReaderError::InvalidFormat {
83 message: format!(
84 "Expected '@' at start of FASTQ record, found '{}'",
85 buf[0] as char
86 ),
87 }));
88 }
89 }
90 Some(pos) => {
91 self.pending_consume = pos;
92 continue;
93 }
94 None => {
95 self.pending_consume = buf_len;
96 continue;
97 }
98 }
99 }
100
101 State::Id => {
102 if let Some(newline_pos) = memchr(b'\n', buf) {
103 let end = if newline_pos > 0 && buf[newline_pos - 1] == b'\r' {
104 newline_pos - 1
105 } else {
106 newline_pos
107 };
108
109 self.state = State::Sequence;
110 self.pending_consume = newline_pos + 1;
111
112 if end > 0 {
113 let slice = unsafe { std::slice::from_raw_parts(buf_ptr, end) };
114 return Some(Ok(Event::IdChunk(slice)));
115 } else {
116 continue;
117 }
118 } else {
119 self.pending_consume = buf_len;
120 let slice = unsafe { std::slice::from_raw_parts(buf_ptr, buf_len) };
121 return Some(Ok(Event::IdChunk(slice)));
122 }
123 }
124
125 State::Sequence => {
126 if buf[0] == b'+' {
127 if let Some(newline_pos) = memchr(b'\n', buf) {
128 self.pending_consume = newline_pos + 1;
129 } else {
130 self.pending_consume = buf_len;
131 }
132 self.state = State::Plus;
133 continue;
134 }
135
136 if buf[0] == b'\n' {
137 self.pending_consume = 1;
138 continue;
139 }
140 if buf[0] == b'\r' {
141 self.pending_consume = if buf_len > 1 && buf[1] == b'\n' { 2 } else { 1 };
142 continue;
143 }
144
145 let chunk_end = memchr3(b'\n', b'\r', b'+', buf).unwrap_or(buf_len);
146
147 if chunk_end == 0 {
148 self.pending_consume = 1;
149 continue;
150 }
151
152 self.pending_consume = chunk_end;
153 self.seq_len += chunk_end;
154 let slice = unsafe { std::slice::from_raw_parts(buf_ptr, chunk_end) };
155 return Some(Ok(Event::SeqChunk(slice)));
156 }
157
158 State::Plus => {
159 if buf[0] == b'\n' {
160 self.pending_consume = 1;
161 self.state = State::Quality;
162 continue;
163 }
164 if buf[0] == b'\r' {
165 self.pending_consume = if buf_len > 1 && buf[1] == b'\n' { 2 } else { 1 };
166 self.state = State::Quality;
167 continue;
168 }
169 self.state = State::Quality;
170 continue;
171 }
172
173 State::Quality => {
174 if buf[0] == b'\n' {
175 self.pending_consume = 1;
176 continue;
177 }
178 if buf[0] == b'\r' {
179 self.pending_consume = if buf_len > 1 && buf[1] == b'\n' { 2 } else { 1 };
180 continue;
181 }
182
183 let remaining = self.seq_len.saturating_sub(self.qual_len);
184
185 if remaining == 0 {
186 self.state = State::Start;
187 continue;
188 }
189
190 let newline_pos = memchr2(b'\n', b'\r', buf);
191 let chunk_end = match newline_pos {
192 Some(pos) => pos.min(remaining),
193 None => buf_len.min(remaining),
194 };
195
196 if chunk_end == 0 {
197 self.pending_consume = 1;
198 continue;
199 }
200
201 self.pending_consume = chunk_end;
202 self.qual_len += chunk_end;
203 let slice = unsafe { std::slice::from_raw_parts(buf_ptr, chunk_end) };
204
205 if self.qual_len >= self.seq_len {
206 self.state = State::Start;
207 }
208
209 return Some(Ok(Event::QualChunk(slice)));
210 }
211 }
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use std::io::Cursor;
220
221 #[test]
222 fn test_single_record() {
223 let data = b"@read1 description\nACGT\n+\nIIII\n";
224 let mut reader = FastqReader::new(Cursor::new(&data[..]));
225
226 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read1 description"));
227 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"ACGT"));
228 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"IIII"));
229 assert!(reader.next_event().is_none());
230 }
231
232 #[test]
233 fn test_multiple_records() {
234 let data = b"@read1\nACGT\n+\nIIII\n@read2\nTGCA\n+\nHHHH\n";
235 let mut reader = FastqReader::new(Cursor::new(&data[..]));
236
237 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read1"));
238 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"ACGT"));
239 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"IIII"));
240 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::NextRecord));
241 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read2"));
242 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"TGCA"));
243 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"HHHH"));
244 assert!(reader.next_event().is_none());
245 }
246
247 #[test]
248 fn test_crlf_line_endings() {
249 let data = b"@read1\r\nACGT\r\n+\r\nIIII\r\n";
250 let mut reader = FastqReader::new(Cursor::new(&data[..]));
251
252 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::IdChunk(id) if id == b"read1"));
253 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::SeqChunk(s) if s == b"ACGT"));
254 assert!(matches!(reader.next_event().unwrap().unwrap(), Event::QualChunk(q) if q == b"IIII"));
255 assert!(reader.next_event().is_none());
256 }
257
258 #[test]
259 fn test_small_buffer() {
260 let data = b"@read1\nACGTACGT\n+\nIIIIIIII\n";
261 let mut reader = FastqReader::with_capacity(4, Cursor::new(&data[..]));
262
263 let mut id = Vec::new();
264 let mut seq = Vec::new();
265 let mut qual = Vec::new();
266
267 loop {
268 match reader.next_event() {
269 Some(Ok(Event::IdChunk(chunk))) => id.extend_from_slice(chunk),
270 Some(Ok(Event::SeqChunk(chunk))) => seq.extend_from_slice(chunk),
271 Some(Ok(Event::QualChunk(chunk))) => qual.extend_from_slice(chunk),
272 Some(Ok(_)) => panic!("Unexpected event"),
273 Some(Err(e)) => panic!("Error: {}", e),
274 None => break,
275 }
276 }
277
278 assert_eq!(&id, b"read1");
279 assert_eq!(&seq, b"ACGTACGT");
280 assert_eq!(&qual, b"IIIIIIII");
281 }
282}