fgumi_simd_fastq/
reader.rs1use std::io::{self, BufRead};
7
8use crate::parser::{self, parse_single_record};
9
10const DEFAULT_BUFFER_SIZE: usize = 1 << 20;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct OwnedFastqRecord {
16 pub name: Vec<u8>,
18 pub sequence: Vec<u8>,
20 pub quality: Vec<u8>,
22}
23
24pub struct SimdFastqReader<R: BufRead> {
43 inner: R,
44 buffer: Vec<u8>,
46 offsets: Vec<usize>,
48 next_record_idx: usize,
50 valid: usize,
52 at_eof: bool,
54}
55
56impl<R: BufRead> SimdFastqReader<R> {
57 pub fn new(inner: R) -> Self {
59 Self::with_capacity(inner, DEFAULT_BUFFER_SIZE)
60 }
61
62 pub fn with_capacity(inner: R, capacity: usize) -> Self {
64 Self {
65 inner,
66 buffer: Vec::with_capacity(capacity),
67 offsets: Vec::new(),
68 next_record_idx: 0,
69 valid: 0,
70 at_eof: false,
71 }
72 }
73
74 fn fill_buffer(&mut self) -> io::Result<bool> {
78 let leftover_start = if self.offsets.is_empty() {
80 0
81 } else {
82 self.offsets.last().copied().unwrap_or(0)
85 };
86
87 if leftover_start > 0 && leftover_start < self.valid {
89 self.buffer.copy_within(leftover_start..self.valid, 0);
90 self.valid -= leftover_start;
91 } else if leftover_start >= self.valid {
92 self.valid = 0;
93 }
94
95 if self.valid >= self.buffer.capacity() {
98 self.buffer.reserve(self.buffer.capacity().max(4096));
99 }
100 self.buffer.resize(self.buffer.capacity(), 0);
101
102 let mut total_read = 0;
104 while self.valid + total_read < self.buffer.len() {
105 let buf = &mut self.buffer[self.valid + total_read..];
106 if buf.is_empty() {
107 break;
108 }
109 match self.inner.read(buf) {
110 Ok(0) => {
111 self.at_eof = true;
112 break;
113 }
114 Ok(n) => total_read += n,
115 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
116 Err(e) => return Err(e),
117 }
118 }
119
120 self.valid += total_read;
121 self.buffer.truncate(self.valid);
122
123 self.offsets = parser::find_record_offsets(&self.buffer[..self.valid]);
125 self.next_record_idx = 0;
126
127 Ok(self.offsets.len() > 1 || (!self.at_eof && self.valid > 0))
129 }
130}
131
132impl<R: BufRead> Iterator for SimdFastqReader<R> {
133 type Item = io::Result<OwnedFastqRecord>;
134
135 fn next(&mut self) -> Option<Self::Item> {
136 loop {
137 if self.next_record_idx + 1 < self.offsets.len() {
138 let start = self.offsets[self.next_record_idx];
139 let end = self.offsets[self.next_record_idx + 1];
140 self.next_record_idx += 1;
141
142 let borrowed = parse_single_record(&self.buffer[start..end]);
143 return Some(Ok(OwnedFastqRecord {
144 name: borrowed.name.to_vec(),
145 sequence: borrowed.sequence.to_vec(),
146 quality: borrowed.quality.to_vec(),
147 }));
148 }
149
150 if self.at_eof {
151 let leftover_start = self.offsets.last().copied().unwrap_or(0);
153 if leftover_start < self.valid {
154 return Some(Err(io::Error::new(
155 io::ErrorKind::InvalidData,
156 format!(
157 "Truncated FASTQ record at EOF ({} leftover bytes)",
158 self.valid - leftover_start
159 ),
160 )));
161 }
162 return None;
163 }
164
165 match self.fill_buffer() {
166 Ok(true) => {}
167 Ok(false) => return None,
168 Err(e) => return Some(Err(e)),
169 }
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use std::io::Cursor;
178
179 #[test]
180 fn test_reader_single_record() {
181 let data = b"@r1\nACGT\n+\nIIII\n";
182 let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
183
184 let rec = reader
185 .next()
186 .expect("reader should yield a record")
187 .expect("record should parse successfully");
188 assert_eq!(rec.name, b"r1");
189 assert_eq!(rec.sequence, b"ACGT");
190 assert_eq!(rec.quality, b"IIII");
191
192 assert!(reader.next().is_none());
193 }
194
195 #[test]
196 fn test_reader_multiple_records() {
197 let data = b"@r1\nACGT\n+\nIIII\n@r2\nTTTT\n+\nJJJJ\n";
198 let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
199
200 let rec1 = reader
201 .next()
202 .expect("reader should yield a record")
203 .expect("record should parse successfully");
204 assert_eq!(rec1.name, b"r1");
205
206 let rec2 = reader
207 .next()
208 .expect("reader should yield a record")
209 .expect("record should parse successfully");
210 assert_eq!(rec2.name, b"r2");
211
212 assert!(reader.next().is_none());
213 }
214
215 #[test]
216 fn test_reader_tiny_buffer() {
217 let data = b"@r1\nACGT\n+\nIIII\n@r2\nTTTT\n+\nJJJJ\n";
219 let mut reader = SimdFastqReader::with_capacity(Cursor::new(&data[..]), 20);
220
221 let rec1 = reader
222 .next()
223 .expect("reader should yield a record")
224 .expect("record should parse successfully");
225 assert_eq!(rec1.name, b"r1");
226 assert_eq!(rec1.sequence, b"ACGT");
227
228 let rec2 = reader
229 .next()
230 .expect("reader should yield a record")
231 .expect("record should parse successfully");
232 assert_eq!(rec2.name, b"r2");
233 assert_eq!(rec2.sequence, b"TTTT");
234
235 assert!(reader.next().is_none());
236 }
237
238 #[test]
239 fn test_reader_empty_input() {
240 let data = b"";
241 let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
242 assert!(reader.next().is_none());
243 }
244
245 #[test]
246 fn test_reader_long_records() {
247 let seq = "A".repeat(500);
248 let qual = "I".repeat(500);
249 let data = format!("@longread\n{seq}\n+\n{qual}\n");
250 let mut reader = SimdFastqReader::with_capacity(Cursor::new(data.as_bytes()), 256);
251
252 let rec = reader
253 .next()
254 .expect("reader should yield a record")
255 .expect("record should parse successfully");
256 assert_eq!(rec.name, b"longread");
257 assert_eq!(rec.sequence.len(), 500);
258 assert_eq!(rec.quality.len(), 500);
259
260 assert!(reader.next().is_none());
261 }
262}