Skip to main content

fgumi_simd_fastq/
reader.rs

1//! Buffered FASTQ reader using SIMD-accelerated parsing.
2//!
3//! [`SimdFastqReader`] wraps a `BufRead` and yields owned FASTQ records,
4//! serving as a drop-in replacement for `seq_io::fastq::Reader`.
5
6use std::io::{self, BufRead};
7
8use crate::parser::{self, parse_single_record};
9
10/// Default internal buffer size (1 MiB), matching `seq_io`'s default.
11const DEFAULT_BUFFER_SIZE: usize = 1 << 20;
12
13/// An owned FASTQ record with heap-allocated name, sequence, and quality.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct OwnedFastqRecord {
16    /// Read name (without leading `@`).
17    pub name: Vec<u8>,
18    /// Sequence bases.
19    pub sequence: Vec<u8>,
20    /// Quality scores (Phred-encoded ASCII).
21    pub quality: Vec<u8>,
22}
23
24/// Buffered FASTQ reader that uses SIMD-accelerated record boundary detection.
25///
26/// Reads chunks from the underlying `BufRead`, finds record boundaries with
27/// [`find_record_offsets`](crate::find_record_offsets), and yields owned records.
28///
29/// # Example
30///
31/// ```
32/// use fgumi_simd_fastq::SimdFastqReader;
33/// use std::io::Cursor;
34///
35/// let data = b"@r1\nACGT\n+\nIIII\n@r2\nTTTT\n+\nJJJJ\n";
36/// let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
37///
38/// let rec = reader.next().unwrap().unwrap();
39/// assert_eq!(rec.name, b"r1");
40/// assert_eq!(rec.sequence, b"ACGT");
41/// ```
42pub struct SimdFastqReader<R: BufRead> {
43    inner: R,
44    /// Internal buffer holding data read from the source.
45    buffer: Vec<u8>,
46    /// Pre-computed record boundary offsets within `buffer[..valid]`.
47    offsets: Vec<usize>,
48    /// Index into `offsets` for the next record to yield.
49    next_record_idx: usize,
50    /// Number of valid bytes in `buffer`.
51    valid: usize,
52    /// True when the underlying reader has returned 0 bytes.
53    at_eof: bool,
54}
55
56impl<R: BufRead> SimdFastqReader<R> {
57    /// Create a new reader with the default buffer size (1 MiB).
58    pub fn new(inner: R) -> Self {
59        Self::with_capacity(inner, DEFAULT_BUFFER_SIZE)
60    }
61
62    /// Create a new reader with a custom buffer capacity.
63    pub fn with_capacity(inner: R, capacity: usize) -> Self {
64        Self {
65            inner,
66            buffer: Vec::with_capacity(capacity),
67            offsets: Vec::new(),
68            next_record_idx: 0,
69            valid: 0,
70            at_eof: false,
71        }
72    }
73
74    /// Fill the internal buffer, preserving any leftover bytes from incomplete records.
75    ///
76    /// Returns `true` if new data was read (or there are still records to yield).
77    fn fill_buffer(&mut self) -> io::Result<bool> {
78        // Determine leftover: bytes from the last complete record offset to end of valid data.
79        let leftover_start = if self.offsets.is_empty() {
80            0
81        } else {
82            // The last offset in `offsets` is the start of the first incomplete record
83            // (or the end of the last complete record, which is the same thing).
84            self.offsets.last().copied().unwrap_or(0)
85        };
86
87        // Move leftover bytes to the front of the buffer
88        if leftover_start > 0 && leftover_start < self.valid {
89            self.buffer.copy_within(leftover_start..self.valid, 0);
90            self.valid -= leftover_start;
91        } else if leftover_start >= self.valid {
92            self.valid = 0;
93        }
94
95        // If the buffer is full of leftover (no complete records found), grow it
96        // so we can read more data and find the end of the current record.
97        if self.valid >= self.buffer.capacity() {
98            self.buffer.reserve(self.buffer.capacity().max(4096));
99        }
100        self.buffer.resize(self.buffer.capacity(), 0);
101
102        // Read new data into the buffer after the leftover
103        let mut total_read = 0;
104        while self.valid + total_read < self.buffer.len() {
105            let buf = &mut self.buffer[self.valid + total_read..];
106            if buf.is_empty() {
107                break;
108            }
109            match self.inner.read(buf) {
110                Ok(0) => {
111                    self.at_eof = true;
112                    break;
113                }
114                Ok(n) => total_read += n,
115                Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
116                Err(e) => return Err(e),
117            }
118        }
119
120        self.valid += total_read;
121        self.buffer.truncate(self.valid);
122
123        // Find record boundaries in the buffer
124        self.offsets = parser::find_record_offsets(&self.buffer[..self.valid]);
125        self.next_record_idx = 0;
126
127        // We have data if there are any complete records
128        Ok(self.offsets.len() > 1 || (!self.at_eof && self.valid > 0))
129    }
130}
131
132impl<R: BufRead> Iterator for SimdFastqReader<R> {
133    type Item = io::Result<OwnedFastqRecord>;
134
135    fn next(&mut self) -> Option<Self::Item> {
136        loop {
137            if self.next_record_idx + 1 < self.offsets.len() {
138                let start = self.offsets[self.next_record_idx];
139                let end = self.offsets[self.next_record_idx + 1];
140                self.next_record_idx += 1;
141
142                let borrowed = parse_single_record(&self.buffer[start..end]);
143                return Some(Ok(OwnedFastqRecord {
144                    name: borrowed.name.to_vec(),
145                    sequence: borrowed.sequence.to_vec(),
146                    quality: borrowed.quality.to_vec(),
147                }));
148            }
149
150            if self.at_eof {
151                // Check for leftover bytes that form an incomplete record
152                let leftover_start = self.offsets.last().copied().unwrap_or(0);
153                if leftover_start < self.valid {
154                    return Some(Err(io::Error::new(
155                        io::ErrorKind::InvalidData,
156                        format!(
157                            "Truncated FASTQ record at EOF ({} leftover bytes)",
158                            self.valid - leftover_start
159                        ),
160                    )));
161                }
162                return None;
163            }
164
165            match self.fill_buffer() {
166                Ok(true) => {}
167                Ok(false) => return None,
168                Err(e) => return Some(Err(e)),
169            }
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use std::io::Cursor;
178
179    #[test]
180    fn test_reader_single_record() {
181        let data = b"@r1\nACGT\n+\nIIII\n";
182        let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
183
184        let rec = reader
185            .next()
186            .expect("reader should yield a record")
187            .expect("record should parse successfully");
188        assert_eq!(rec.name, b"r1");
189        assert_eq!(rec.sequence, b"ACGT");
190        assert_eq!(rec.quality, b"IIII");
191
192        assert!(reader.next().is_none());
193    }
194
195    #[test]
196    fn test_reader_multiple_records() {
197        let data = b"@r1\nACGT\n+\nIIII\n@r2\nTTTT\n+\nJJJJ\n";
198        let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
199
200        let rec1 = reader
201            .next()
202            .expect("reader should yield a record")
203            .expect("record should parse successfully");
204        assert_eq!(rec1.name, b"r1");
205
206        let rec2 = reader
207            .next()
208            .expect("reader should yield a record")
209            .expect("record should parse successfully");
210        assert_eq!(rec2.name, b"r2");
211
212        assert!(reader.next().is_none());
213    }
214
215    #[test]
216    fn test_reader_tiny_buffer() {
217        // Use a very small buffer to force multiple refills
218        let data = b"@r1\nACGT\n+\nIIII\n@r2\nTTTT\n+\nJJJJ\n";
219        let mut reader = SimdFastqReader::with_capacity(Cursor::new(&data[..]), 20);
220
221        let rec1 = reader
222            .next()
223            .expect("reader should yield a record")
224            .expect("record should parse successfully");
225        assert_eq!(rec1.name, b"r1");
226        assert_eq!(rec1.sequence, b"ACGT");
227
228        let rec2 = reader
229            .next()
230            .expect("reader should yield a record")
231            .expect("record should parse successfully");
232        assert_eq!(rec2.name, b"r2");
233        assert_eq!(rec2.sequence, b"TTTT");
234
235        assert!(reader.next().is_none());
236    }
237
238    #[test]
239    fn test_reader_empty_input() {
240        let data = b"";
241        let mut reader = SimdFastqReader::new(Cursor::new(&data[..]));
242        assert!(reader.next().is_none());
243    }
244
245    #[test]
246    fn test_reader_long_records() {
247        let seq = "A".repeat(500);
248        let qual = "I".repeat(500);
249        let data = format!("@longread\n{seq}\n+\n{qual}\n");
250        let mut reader = SimdFastqReader::with_capacity(Cursor::new(data.as_bytes()), 256);
251
252        let rec = reader
253            .next()
254            .expect("reader should yield a record")
255            .expect("record should parse successfully");
256        assert_eq!(rec.name, b"longread");
257        assert_eq!(rec.sequence.len(), 500);
258        assert_eq!(rec.quality.len(), 500);
259
260        assert!(reader.next().is_none());
261    }
262}