binseq/bq/reader.rs
1//! Binary sequence reader module
2//!
3//! This module provides functionality for reading binary sequence files using either:
4//! 1. Memory mapping for efficient access to entire files
5//! 2. Streaming for processing data as it arrives
6//!
7//! It supports both sequential and parallel processing of records,
8//! with configurable record layouts for different sequence types.
9
10use std::fs::File;
11use std::io::Read;
12use std::ops::Range;
13use std::path::Path;
14use std::sync::Arc;
15
16use bitnuc::BitSize;
17use bytemuck::cast_slice;
18use memmap2::Mmap;
19
20use super::header::{FileHeader, SIZE_HEADER};
21use crate::{
22 BinseqRecord, DEFAULT_QUALITY_SCORE, Error, ParallelProcessor, ParallelReader,
23 error::{ReadError, Result},
24};
25
26/// A reference to a binary sequence record in a memory-mapped file
27///
28/// This struct provides a view into a single record within a binary sequence file,
29/// allowing access to the record's components (sequence data, flags, etc.) without
30/// copying the data from the memory-mapped file.
31///
32/// The record's data is stored in a compact binary format where:
33/// - The first u64 contains flags
34/// - Subsequent u64s contain the primary sequence data
35/// - If present, final u64s contain the extended sequence data
36#[derive(Clone, Copy)]
37pub struct RefRecord<'a> {
38 /// The position (index) of this record in the file (0-based record index, not byte offset)
39 id: u64,
40 /// The underlying u64 buffer representing the record's binary data
41 buffer: &'a [u64],
42 /// Reusable default quality buffer
43 qbuf: &'a [u8],
44 /// The configuration that defines the layout and size of record components
45 config: RecordConfig,
46 /// Cached index string for the sequence header
47 header_buf: [u8; 20],
48 /// Length of the header in bytes
49 header_len: usize,
50}
51impl<'a> RefRecord<'a> {
52 /// Creates a new record reference
53 ///
54 /// # Arguments
55 ///
56 /// * `id` - The record's position in the file (0-based record index, not byte offset)
57 /// * `buffer` - The u64 slice containing the record's binary data
58 /// * `config` - Configuration defining the record's layout
59 ///
60 /// # Panics
61 ///
62 /// Panics if the buffer length doesn't match the expected size from the config
63 #[must_use]
64 pub fn new(id: u64, buffer: &'a [u64], qbuf: &'a [u8], config: RecordConfig) -> Self {
65 assert_eq!(buffer.len(), config.record_size_u64());
66 Self {
67 id,
68 buffer,
69 qbuf,
70 config,
71 header_buf: [0; 20],
72 header_len: 0,
73 }
74 }
75 /// Returns the record's configuration
76 ///
77 /// The configuration defines the layout and size of the record's components.
78 #[must_use]
79 pub fn config(&self) -> RecordConfig {
80 self.config
81 }
82
83 pub fn set_id(&mut self, id: &[u8]) {
84 self.header_len = id.len();
85 self.header_buf[..self.header_len].copy_from_slice(id);
86 }
87}
88
89impl BinseqRecord for RefRecord<'_> {
90 fn bitsize(&self) -> BitSize {
91 self.config.bitsize
92 }
93 fn index(&self) -> u64 {
94 self.id
95 }
96 /// Clear the buffer and fill it with the sequence header
97 fn sheader(&self) -> &[u8] {
98 &self.header_buf[..self.header_len]
99 }
100
101 /// Clear the buffer and fill it with the extended header
102 fn xheader(&self) -> &[u8] {
103 self.sheader()
104 }
105
106 fn flag(&self) -> Option<u64> {
107 if self.config.flags {
108 Some(self.buffer[0])
109 } else {
110 None
111 }
112 }
113 fn slen(&self) -> u64 {
114 self.config.slen
115 }
116 fn xlen(&self) -> u64 {
117 self.config.xlen
118 }
119 fn sbuf(&self) -> &[u64] {
120 if self.config.flags {
121 &self.buffer[1..=(self.config.schunk as usize)]
122 } else {
123 &self.buffer[..(self.config.schunk as usize)]
124 }
125 }
126 fn xbuf(&self) -> &[u64] {
127 if self.config.flags {
128 &self.buffer[1 + self.config.schunk as usize..]
129 } else {
130 &self.buffer[self.config.schunk as usize..]
131 }
132 }
133 fn squal(&self) -> &[u8] {
134 &self.qbuf[..self.config.slen as usize]
135 }
136 fn xqual(&self) -> &[u8] {
137 &self.qbuf[..self.config.xlen as usize]
138 }
139}
140
141/// A reference to a record in the map with a precomputed decoded buffer slice
142pub struct BatchRecord<'a> {
143 /// Unprocessed buffer slice (with flags)
144 buffer: &'a [u64],
145 /// Decoded buffer slice
146 dbuf: &'a [u8],
147 /// Record ID
148 id: u64,
149 /// The configuration that defines the layout and size of record components
150 config: RecordConfig,
151 /// A reusable pre-initialized quality score buffer
152 qbuf: &'a [u8],
153 /// Cached index string for the sequence header
154 header_buf: [u8; 20],
155 /// Length of the header in bytes
156 header_len: usize,
157}
158impl BinseqRecord for BatchRecord<'_> {
159 fn bitsize(&self) -> BitSize {
160 self.config.bitsize
161 }
162 fn index(&self) -> u64 {
163 self.id
164 }
165 /// Clear the buffer and fill it with the sequence header
166 fn sheader(&self) -> &[u8] {
167 &self.header_buf[..self.header_len]
168 }
169
170 /// Clear the buffer and fill it with the extended header
171 fn xheader(&self) -> &[u8] {
172 self.sheader()
173 }
174
175 fn flag(&self) -> Option<u64> {
176 if self.config.flags {
177 Some(self.buffer[0])
178 } else {
179 None
180 }
181 }
182 fn slen(&self) -> u64 {
183 self.config.slen
184 }
185 fn xlen(&self) -> u64 {
186 self.config.xlen
187 }
188 fn sbuf(&self) -> &[u64] {
189 if self.config.flags {
190 &self.buffer[1..=(self.config.schunk as usize)]
191 } else {
192 &self.buffer[..(self.config.schunk as usize)]
193 }
194 }
195 fn xbuf(&self) -> &[u64] {
196 if self.config.flags {
197 &self.buffer[1 + self.config.schunk as usize..]
198 } else {
199 &self.buffer[self.config.schunk as usize..]
200 }
201 }
202 fn decode_s(&self, dbuf: &mut Vec<u8>) -> Result<()> {
203 dbuf.extend_from_slice(self.sseq());
204 Ok(())
205 }
206 fn decode_x(&self, dbuf: &mut Vec<u8>) -> Result<()> {
207 dbuf.extend_from_slice(self.xseq());
208 Ok(())
209 }
210 /// Override this method since we can make use of block information
211 fn sseq(&self) -> &[u8] {
212 let scalar = self.config.scalar();
213 let mut lbound = 0;
214 let mut rbound = self.config.slen();
215 if self.config.flags {
216 lbound += scalar;
217 rbound += scalar;
218 }
219 &self.dbuf[lbound..rbound]
220 }
221 /// Override this method since we can make use of block information
222 fn xseq(&self) -> &[u8] {
223 let scalar = self.config.scalar();
224 let mut lbound = scalar * self.config.schunk();
225 let mut rbound = lbound + self.config.xlen();
226 if self.config.flags {
227 lbound += scalar;
228 rbound += scalar;
229 }
230 &self.dbuf[lbound..rbound]
231 }
232 fn squal(&self) -> &[u8] {
233 &self.qbuf[..self.config.slen()]
234 }
235 fn xqual(&self) -> &[u8] {
236 &self.qbuf[..self.config.xlen()]
237 }
238}
239
240/// Configuration for binary sequence record layout
241///
242/// This struct defines the size and layout of binary sequence records,
243/// including both primary sequence data and optional extended data.
244/// It handles the translation between sequence lengths in base pairs
245/// and the number of u64 chunks needed to store the compressed data.
246#[derive(Clone, Copy)]
247pub struct RecordConfig {
248 /// The primary sequence length in base pairs
249 slen: u64,
250 /// The extended sequence length in base pairs
251 xlen: u64,
252 /// The number of u64 chunks needed to store the primary sequence
253 /// (each u64 stores 32 nucleotides)
254 schunk: u64,
255 /// The number of u64 chunks needed to store the extended sequence
256 /// (each u64 stores 32 values)
257 xchunk: u64,
258 /// The bitsize of the record
259 bitsize: BitSize,
260 /// Whether flags are present
261 flags: bool,
262}
263impl RecordConfig {
264 /// Creates a new record configuration
265 ///
266 /// This constructor initializes a configuration for a binary sequence record
267 /// with specified primary and extended sequence lengths.
268 ///
269 /// # Arguments
270 ///
271 /// * `slen` - The length of primary sequences in the file
272 /// * `xlen` - The length of secondary/extended sequences in the file
273 /// * `bitsize` - The bitsize of the record
274 /// * `flags` - Whether flags are present
275 ///
276 /// # Returns
277 ///
278 /// A new `RecordConfig` instance with the specified sequence lengths
279 pub fn new(slen: usize, xlen: usize, bitsize: BitSize, flags: bool) -> Self {
280 let (schunk, xchunk) = match bitsize {
281 BitSize::Two => (slen.div_ceil(32), xlen.div_ceil(32)),
282 BitSize::Four => (slen.div_ceil(16), xlen.div_ceil(16)),
283 };
284 Self {
285 slen: slen as u64,
286 xlen: xlen as u64,
287 schunk: schunk as u64,
288 xchunk: xchunk as u64,
289 bitsize,
290 flags,
291 }
292 }
293
294 /// Creates a new record configuration from a header
295 ///
296 /// This constructor initializes a configuration based on a header that contains
297 /// the sequence lengths for primary and extended sequences.
298 ///
299 /// # Arguments
300 ///
301 /// * `header` - A reference to a `FileHeader` containing sequence lengths
302 ///
303 /// # Returns
304 ///
305 /// A new `RecordConfig` instance with the sequence lengths from the header
306 pub fn from_header(header: &FileHeader) -> Self {
307 Self::new(
308 header.slen as usize,
309 header.xlen as usize,
310 header.bits,
311 header.flags,
312 )
313 }
314
315 /// Returns whether this record contains extended sequence data
316 ///
317 /// A record is considered paired if it has a non-zero extended sequence length.
318 pub fn paired(&self) -> bool {
319 self.xlen > 0
320 }
321
322 /// Returns the primary sequence length in base pairs
323 ///
324 /// This method returns the length of the primary sequence in base pairs.
325 pub fn slen(&self) -> usize {
326 self.slen as usize
327 }
328
329 /// Returns the extended sequence length in base pairs
330 ///
331 /// This method returns the length of the extended sequence in base pairs.
332 pub fn xlen(&self) -> usize {
333 self.xlen as usize
334 }
335
336 /// Returns the number of u64 chunks needed to store the primary sequence
337 ///
338 /// This method returns the number of u64 chunks required to store the primary
339 /// sequence, where each u64 stores 32 nucleotides.
340 pub fn schunk(&self) -> usize {
341 self.schunk as usize
342 }
343
344 /// Returns the number of u64 chunks needed to store the extended sequence
345 ///
346 /// This method returns the number of u64 chunks required to store the extended
347 /// sequence, where each u64 stores 32 values.
348 pub fn xchunk(&self) -> usize {
349 self.xchunk as usize
350 }
351
352 /// Returns the full record size in bytes (u8):
353 /// 8 * (schunk + xchunk + 1 (flag))
354 pub fn record_size_bytes(&self) -> usize {
355 8 * self.record_size_u64()
356 }
357
358 /// Returns the full record size in u64
359 /// schunk + xchunk + 1 (flag)
360 pub fn record_size_u64(&self) -> usize {
361 if self.flags {
362 (self.schunk + self.xchunk + 1) as usize
363 } else {
364 (self.schunk + self.xchunk) as usize
365 }
366 }
367
368 /// The number of nucleotides per word
369 pub fn scalar(&self) -> usize {
370 match self.bitsize {
371 BitSize::Two => 32,
372 BitSize::Four => 16,
373 }
374 }
375}
376
377/// A memory-mapped reader for binary sequence files
378///
379/// This reader provides efficient access to binary sequence files by memory-mapping
380/// them instead of performing traditional I/O operations. It supports both
381/// sequential access to individual records and parallel processing of records
382/// across multiple threads.
383///
384/// The reader ensures thread-safety through the use of `Arc` for sharing the
385/// memory-mapped data between threads.
386///
387/// Records are returned as [`RefRecord`] which implement the [`BinseqRecord`] trait.
388///
389/// # Examples
390///
391/// ```
392/// use binseq::bq::MmapReader;
393/// use binseq::Result;
394///
395/// fn main() -> Result<()> {
396/// let path = "./data/subset.bq";
397/// let reader = MmapReader::new(path)?;
398///
399/// // Calculate the number of records in the file
400/// let num_records = reader.num_records();
401/// println!("Number of records: {}", num_records);
402///
403/// // Get the record at index 20 (0-indexed)
404/// let record = reader.get(20)?;
405///
406/// Ok(())
407/// }
408/// ```
409pub struct MmapReader {
410 /// Memory mapped file contents, wrapped in Arc for thread-safe sharing
411 mmap: Arc<Mmap>,
412
413 /// Binary sequence file header containing format information
414 header: FileHeader,
415
416 /// Configuration defining the layout of records in the file
417 config: RecordConfig,
418
419 /// Reusable buffer for quality scores
420 qbuf: Vec<u8>,
421
422 /// Default quality score for records without quality scores
423 default_quality_score: u8,
424}
425
426impl MmapReader {
427 /// Creates a new memory-mapped reader for a binary sequence file
428 ///
429 /// This method opens the file, memory-maps its contents, and validates
430 /// the file structure to ensure it contains valid binary sequence data.
431 ///
432 /// # Arguments
433 ///
434 /// * `path` - Path to the binary sequence file
435 ///
436 /// # Returns
437 ///
438 /// * `Ok(MmapReader)` - A new reader if the file is valid
439 /// * `Err(Error)` - If the file is invalid or cannot be opened
440 ///
441 /// # Errors
442 ///
443 /// Returns an error if:
444 /// * The file cannot be opened
445 /// * The file is not a regular file
446 /// * The file header is invalid
447 /// * The file size doesn't match the expected size based on the header
448 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
449 // Verify input file is a file before attempting to map
450 let file = File::open(path)?;
451 if !file.metadata()?.is_file() {
452 return Err(ReadError::IncompatibleFile.into());
453 }
454
455 // Safety: the file is open and won't be modified while mapped
456 let mmap = unsafe { Mmap::map(&file)? };
457
458 // Read header from mapped memory
459 let header = FileHeader::from_buffer(&mmap)?;
460
461 // Record configuraration
462 let config = RecordConfig::from_header(&header);
463
464 // Immediately validate the size of the file against the expected byte size of records
465 if !(mmap.len() - SIZE_HEADER).is_multiple_of(config.record_size_bytes()) {
466 return Err(ReadError::FileTruncation(mmap.len()).into());
467 }
468
469 // preinitialize quality buffer
470 let qbuf = vec![DEFAULT_QUALITY_SCORE; header.slen.max(header.xlen) as usize];
471
472 Ok(Self {
473 mmap: Arc::new(mmap),
474 header,
475 config,
476 qbuf,
477 default_quality_score: DEFAULT_QUALITY_SCORE,
478 })
479 }
480
481 /// Returns the total number of records in the file
482 ///
483 /// This is calculated by subtracting the header size from the total file size
484 /// and dividing by the size of each record.
485 #[must_use]
486 pub fn num_records(&self) -> usize {
487 (self.mmap.len() - SIZE_HEADER) / self.config.record_size_bytes()
488 }
489
490 /// Returns a copy of the binary sequence file header
491 ///
492 /// The header contains format information and sequence length specifications.
493 #[must_use]
494 pub fn header(&self) -> FileHeader {
495 self.header
496 }
497
498 /// Checks if the file has paired-records
499 #[must_use]
500 pub fn is_paired(&self) -> bool {
501 self.header.is_paired()
502 }
503
504 /// Sets the default quality score for records without quality information
505 pub fn set_default_quality_score(&mut self, score: u8) {
506 self.default_quality_score = score;
507 self.qbuf = self.build_qbuf();
508 }
509
510 /// Creates a new quality score buffer
511 #[must_use]
512 pub fn build_qbuf(&self) -> Vec<u8> {
513 vec![self.default_quality_score; self.header.slen.max(self.header.xlen) as usize]
514 }
515
516 /// Returns a reference to a specific record
517 ///
518 /// # Arguments
519 ///
520 /// * `idx` - The index of the record to retrieve (0-based)
521 ///
522 /// # Returns
523 ///
524 /// * `Ok(RefRecord)` - A reference to the requested record
525 /// * `Err(Error)` - If the index is out of bounds
526 ///
527 /// # Errors
528 ///
529 /// Returns an error if the requested index is beyond the number of records in the file
530 pub fn get(&self, idx: usize) -> Result<RefRecord<'_>> {
531 if idx > self.num_records() {
532 return Err(ReadError::OutOfRange {
533 requested_index: idx,
534 max_index: self.num_records(),
535 }
536 .into());
537 }
538 let rsize = self.config.record_size_bytes();
539 let lbound = SIZE_HEADER + (idx * rsize);
540 let rbound = lbound + rsize;
541 let bytes = &self.mmap[lbound..rbound];
542 let buffer = cast_slice(bytes);
543 Ok(RefRecord::new(idx as u64, buffer, &self.qbuf, self.config))
544 }
545
546 /// Returns a slice of the buffer containing the underlying u64 for that range
547 /// of records.
548 ///
549 /// Note: range 10..40 will return all u64s in the mmap between the record index 10 and 40
550 pub fn get_buffer_slice(&self, range: Range<usize>) -> Result<&[u64]> {
551 if range.end > self.num_records() {
552 return Err(ReadError::OutOfRange {
553 requested_index: range.end,
554 max_index: self.num_records(),
555 }
556 .into());
557 }
558 let rsize = self.config.record_size_bytes();
559 let total_records = range.end - range.start;
560 let lbound = SIZE_HEADER + (range.start * rsize);
561 let rbound = lbound + (total_records * rsize);
562 let bytes = &self.mmap[lbound..rbound];
563 let buffer = cast_slice(bytes);
564 Ok(buffer)
565 }
566}
567
568/// A reader for streaming binary sequence data from any source that implements Read
569///
570/// Unlike `MmapReader` which requires the entire file to be accessible at once,
571/// `StreamReader` processes data as it becomes available, making it suitable for:
572/// - Processing data as it arrives over a network
573/// - Handling very large files that exceed available memory
574/// - Pipeline processing where data is flowing continuously
575///
576/// The reader maintains an internal buffer and can handle partial record reconstruction
577/// across chunk boundaries.
578pub struct StreamReader<R: Read> {
579 /// The source reader for binary sequence data
580 reader: R,
581
582 /// Binary sequence file header containing format information
583 header: Option<FileHeader>,
584
585 /// Configuration defining the layout of records in the file
586 config: Option<RecordConfig>,
587
588 /// Buffer for storing incoming data
589 buffer: Vec<u8>,
590
591 /// Buffer for reusable quality scores
592 qbuf: Vec<u8>,
593
594 /// Default quality score for records without quality information
595 default_quality_score: u8,
596
597 /// Current position in the buffer
598 buffer_pos: usize,
599
600 /// Length of valid data in the buffer
601 buffer_len: usize,
602}
603
604impl<R: Read> StreamReader<R> {
605 /// Creates a new `StreamReader` with the default buffer size
606 ///
607 /// This constructor initializes a `StreamReader` that will read from the provided
608 /// source, using an 8K default buffer size.
609 ///
610 /// # Arguments
611 ///
612 /// * `reader` - The source to read binary sequence data from
613 ///
614 /// # Returns
615 ///
616 /// A new `StreamReader` instance
617 pub fn new(reader: R) -> Self {
618 Self::with_capacity(reader, 8192)
619 }
620
621 /// Creates a new `StreamReader` with a specified buffer capacity
622 ///
623 /// This constructor initializes a `StreamReader` with a custom buffer size,
624 /// which can be tuned based on the expected usage pattern.
625 ///
626 /// # Arguments
627 ///
628 /// * `reader` - The source to read binary sequence data from
629 /// * `capacity` - The size of the internal buffer in bytes
630 ///
631 /// # Returns
632 ///
633 /// A new `StreamReader` instance with the specified buffer capacity
634 pub fn with_capacity(reader: R, capacity: usize) -> Self {
635 Self {
636 reader,
637 header: None,
638 config: None,
639 buffer: vec![0; capacity],
640 qbuf: vec![0; capacity],
641 buffer_pos: 0,
642 buffer_len: 0,
643 default_quality_score: DEFAULT_QUALITY_SCORE,
644 }
645 }
646
647 /// Sets the default quality score for records without quality information
648 pub fn set_default_quality_score(&mut self, score: u8) {
649 if score != self.default_quality_score {
650 self.qbuf.clear();
651 }
652 self.default_quality_score = score;
653 }
654
655 /// Reads and validates the header from the underlying reader
656 ///
657 /// This method reads the binary sequence file header and validates it.
658 /// It caches the header internally for future use.
659 ///
660 /// # Returns
661 ///
662 /// * `Ok(&FileHeader)` - A reference to the validated header
663 /// * `Err(Error)` - If reading or validating the header fails
664 ///
665 /// # Panics
666 ///
667 /// Panics if the header is missing when expected in the stream.
668 ///
669 /// # Errors
670 ///
671 /// Returns an error if:
672 /// * There is an I/O error when reading from the source
673 /// * The header data is invalid
674 /// * End of stream is reached before the full header can be read
675 pub fn read_header(&mut self) -> Result<&FileHeader> {
676 if self.header.is_some() {
677 return Ok(self
678 .header
679 .as_ref()
680 .expect("Missing header when expected in stream"));
681 }
682
683 // Ensure we have enough data for the header
684 while self.buffer_len - self.buffer_pos < SIZE_HEADER {
685 self.fill_buffer()?;
686 }
687
688 // Parse header
689 let header_slice = &self.buffer[self.buffer_pos..self.buffer_pos + SIZE_HEADER];
690 let header = FileHeader::from_buffer(header_slice)?;
691
692 self.header = Some(header);
693 self.config = Some(RecordConfig::from_header(&header));
694 self.buffer_pos += SIZE_HEADER;
695
696 Ok(self.header.as_ref().unwrap())
697 }
698
699 /// Fills the internal buffer with more data from the reader
700 ///
701 /// This method reads more data from the underlying reader, handling
702 /// the case where some unprocessed data remains in the buffer.
703 ///
704 /// # Returns
705 ///
706 /// * `Ok(())` - If the buffer was successfully filled with new data
707 /// * `Err(Error)` - If reading from the source fails
708 ///
709 /// # Errors
710 ///
711 /// Returns an error if:
712 /// * There is an I/O error when reading from the source
713 /// * End of stream is reached (no more data available)
714 fn fill_buffer(&mut self) -> Result<()> {
715 // Move remaining data to beginning of buffer if needed
716 if self.buffer_pos > 0 && self.buffer_pos < self.buffer_len {
717 self.buffer.copy_within(self.buffer_pos..self.buffer_len, 0);
718 self.buffer_len -= self.buffer_pos;
719 self.buffer_pos = 0;
720 } else if self.buffer_pos == self.buffer_len {
721 self.buffer_len = 0;
722 self.buffer_pos = 0;
723 }
724
725 // Read more data
726 let bytes_read = self.reader.read(&mut self.buffer[self.buffer_len..])?;
727 if bytes_read == 0 {
728 return Err(ReadError::EndOfStream.into());
729 }
730
731 self.buffer_len += bytes_read;
732 Ok(())
733 }
734
735 /// Retrieves the next record from the stream
736 ///
737 /// This method reads and processes the next complete record from the stream.
738 /// It handles the case where a record spans multiple buffer fills.
739 ///
740 /// # Returns
741 ///
742 /// * `Ok(Some(RefRecord))` - The next record was successfully read
743 /// * `Ok(None)` - End of stream was reached (no more records)
744 /// * `Err(Error)` - If an error occurred during reading
745 ///
746 /// # Panics
747 ///
748 /// Panics if the configuration is missing when expected in the stream.
749 ///
750 /// # Errors
751 ///
752 /// Returns an error if:
753 /// * There is an I/O error when reading from the source
754 /// * The header has not been read yet
755 /// * The data format is invalid
756 pub fn next_record(&mut self) -> Option<Result<RefRecord<'_>>> {
757 // Ensure header is read
758 if self.header.is_none()
759 && let Some(e) = self.read_header().err()
760 {
761 return Some(Err(e));
762 }
763
764 let config = self
765 .config
766 .expect("Missing configuration when expected in stream");
767 let record_size = config.record_size_bytes();
768
769 // Ensure we have enough data for a complete record
770 while self.buffer_len - self.buffer_pos < record_size {
771 match self.fill_buffer() {
772 Ok(()) => {}
773 Err(Error::ReadError(ReadError::EndOfStream)) => {
774 // End of stream reached - if we have any partial data, it's an error
775 if self.buffer_len - self.buffer_pos > 0 {
776 return Some(Err(ReadError::PartialRecord(
777 self.buffer_len - self.buffer_pos,
778 )
779 .into()));
780 }
781 return None;
782 }
783 Err(e) => return Some(Err(e)),
784 }
785 }
786
787 // Process record
788 let record_start = self.buffer_pos;
789 self.buffer_pos += record_size;
790
791 let record_bytes = &self.buffer[record_start..record_start + record_size];
792 let record_u64s = cast_slice(record_bytes);
793
794 // update quality score buffer if necessary
795 if self.qbuf.is_empty() {
796 let max_size = config.slen.max(config.xlen) as usize;
797 self.qbuf.resize(max_size, self.default_quality_score);
798 }
799
800 // Create record with incremental ID (based on read position)
801 let id = (record_start - SIZE_HEADER) / record_size;
802 Some(Ok(RefRecord::new(
803 id as u64,
804 record_u64s,
805 &self.qbuf,
806 config,
807 )))
808 }
809
810 /// Consumes the stream reader and returns the inner reader
811 ///
812 /// This method is useful when you need access to the underlying reader
813 /// after processing is complete.
814 ///
815 /// # Returns
816 ///
817 /// The inner reader that was used by this `StreamReader`
818 pub fn into_inner(self) -> R {
819 self.reader
820 }
821}
822
823/// Default batch size for parallel processing
824///
825/// This constant defines how many records each thread processes at a time
826/// during parallel processing operations.
827pub const BATCH_SIZE: usize = 1024;
828
829/// Parallel processing implementation for memory-mapped readers
830impl ParallelReader for MmapReader {
831 /// Processes all records in parallel using multiple threads
832 ///
833 /// This method distributes the records across the specified number of threads
834 /// and processes them using the provided processor. Each thread receives its
835 /// own clone of the processor and processes a contiguous chunk of records.
836 ///
837 /// # Arguments
838 ///
839 /// * `processor` - The processor to use for handling records
840 /// * `num_threads` - The number of threads to use for processing
841 ///
842 /// # Type Parameters
843 ///
844 /// * `P` - A type that implements `ParallelProcessor` and can be cloned
845 ///
846 /// # Returns
847 ///
848 /// * `Ok(())` - If all records were processed successfully
849 /// * `Err(Error)` - If an error occurred during processing
850 fn process_parallel<P: ParallelProcessor + Clone + 'static>(
851 self,
852 processor: P,
853 num_threads: usize,
854 ) -> Result<()> {
855 let num_records = self.num_records();
856 self.process_parallel_range(processor, num_threads, 0..num_records)
857 }
858
859 /// Process records in parallel within a specified range
860 ///
861 /// This method allows parallel processing of a subset of records within the file,
862 /// defined by a start and end index. The range is distributed across the specified
863 /// number of threads.
864 ///
865 /// # Arguments
866 ///
867 /// * `processor` - The processor to use for each record
868 /// * `num_threads` - The number of threads to spawn
869 /// * `range` - The range of record indices to process
870 ///
871 /// # Type Parameters
872 ///
873 /// * `P` - A type that implements `ParallelProcessor` and can be cloned
874 ///
875 /// # Returns
876 ///
877 /// * `Ok(())` - If all records were processed successfully
878 /// * `Err(Error)` - If an error occurred during processing
879 fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
880 self,
881 processor: P,
882 num_threads: usize,
883 range: Range<usize>,
884 ) -> Result<()> {
885 // Calculate the number of threads to use
886 let num_threads = if num_threads == 0 {
887 num_cpus::get()
888 } else {
889 num_threads.min(num_cpus::get())
890 };
891
892 // Validate range
893 let num_records = self.num_records();
894 self.validate_range(num_records, &range)?;
895
896 // Calculate number of records for each thread within the range
897 let range_size = range.end - range.start;
898 let records_per_thread = range_size.div_ceil(num_threads);
899
900 // Arc self
901 let reader = Arc::new(self);
902
903 // Build thread handles
904 let mut handles = Vec::new();
905 for tid in 0..num_threads {
906 let mut processor = processor.clone();
907 let reader = reader.clone();
908 processor.set_tid(tid);
909
910 let handle = std::thread::spawn(move || -> Result<()> {
911 let start_idx = range.start + tid * records_per_thread;
912 let end_idx = (start_idx + records_per_thread).min(range.end);
913
914 if start_idx >= end_idx {
915 return Ok(()); // No records for this thread
916 }
917
918 // create a reusable buffer for translating record IDs
919 let mut translater = itoa::Buffer::new();
920
921 // initialize a decoding buffer
922 let mut dbuf = Vec::new();
923
924 // initialize a quality score buffer
925 let qbuf = reader.build_qbuf();
926
927 // calculate the size of a record in the cast u64 slice
928 let rsize_u64 = reader.config.record_size_bytes() / 8;
929
930 // determine the required scalar size
931 let scalar = reader.config.scalar();
932
933 // calculate the size of a record in the batch decoded buffer
934 let mut dbuf_rsize = { (reader.config.schunk() + reader.config.xchunk()) * scalar };
935 if reader.config.flags {
936 dbuf_rsize += scalar;
937 }
938
939 // iterate over the range of indices
940 for range_start in (start_idx..end_idx).step_by(BATCH_SIZE) {
941 let range_end = (range_start + BATCH_SIZE).min(end_idx);
942
943 // clear the decoded buffer
944 dbuf.clear();
945
946 // get the encoded buffer slice
947 let ebuf = reader.get_buffer_slice(range_start..range_end)?;
948
949 // decode the entire buffer at once (with flags and extra bases)
950 reader
951 .config
952 .bitsize
953 .decode(ebuf, ebuf.len() * scalar, &mut dbuf)?;
954
955 // iterate over each index in the range
956 for (inner_idx, idx) in (range_start..range_end).enumerate() {
957 // translate the index
958 let id_str = translater.format(idx);
959
960 // create the index buffer
961 let mut header_buf = [0; 20];
962 let header_len = id_str.len();
963 header_buf[..header_len].copy_from_slice(id_str.as_bytes());
964
965 // find the buffer starts
966 let ebuf_start = inner_idx * rsize_u64;
967 let dbuf_start = inner_idx * dbuf_rsize;
968
969 // initialize the record
970 let record = BatchRecord {
971 buffer: &ebuf[ebuf_start..(ebuf_start + rsize_u64)],
972 dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize)],
973 qbuf: &qbuf,
974 id: idx as u64,
975 config: reader.config,
976 header_buf,
977 header_len,
978 };
979
980 // process the record
981 processor.process_record(record)?;
982 }
983
984 // process the batch
985 processor.on_batch_complete()?;
986 }
987
988 Ok(())
989 });
990
991 handles.push(handle);
992 }
993
994 for handle in handles {
995 handle
996 .join()
997 .expect("Error joining handle (1)")
998 .expect("Error joining handle (2)");
999 }
1000
1001 Ok(())
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008 use crate::BinseqRecord;
1009 use bitnuc::BitSize;
1010
1011 const TEST_BQ_FILE: &str = "./data/subset.bq";
1012
1013 // ==================== MmapReader Basic Tests ====================
1014
1015 #[test]
1016 fn test_mmap_reader_new() {
1017 let reader = MmapReader::new(TEST_BQ_FILE);
1018 assert!(reader.is_ok(), "Failed to create reader");
1019 }
1020
1021 #[test]
1022 fn test_mmap_reader_num_records() {
1023 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1024 let num_records = reader.num_records();
1025 assert!(num_records > 0, "Expected non-zero records");
1026 }
1027
1028 #[test]
1029 fn test_mmap_reader_is_paired() {
1030 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1031 let is_paired = reader.is_paired();
1032 // Test that the method returns a boolean
1033 assert!(is_paired || !is_paired); // Always true, tests the method works
1034 }
1035
1036 #[test]
1037 fn test_mmap_reader_header_access() {
1038 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1039 let header = reader.header();
1040 assert!(header.slen > 0, "Expected non-zero sequence length");
1041 }
1042
1043 #[test]
1044 fn test_mmap_reader_config_access() {
1045 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1046 let header = reader.header();
1047 let config = RecordConfig::from_header(&header);
1048 assert!(
1049 config.slen > 0,
1050 "Expected non-zero sequence length in config"
1051 );
1052 }
1053
1054 // ==================== Record Access Tests ====================
1055
1056 #[test]
1057 fn test_get_record() {
1058 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1059 let num_records = reader.num_records();
1060
1061 if num_records > 0 {
1062 let record = reader.get(0);
1063 assert!(record.is_ok(), "Expected to get first record");
1064
1065 let record = record.unwrap();
1066 assert_eq!(record.index(), 0, "Expected record index to be 0");
1067 }
1068 }
1069
1070 #[test]
1071 fn test_get_record_out_of_bounds() {
1072 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1073 let num_records = reader.num_records();
1074
1075 let record = reader.get(num_records + 100);
1076 assert!(record.is_err(), "Expected error for out of bounds index");
1077 }
1078
1079 #[test]
1080 fn test_record_sequence_data() {
1081 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1082
1083 if let Ok(record) = reader.get(0) {
1084 let sbuf = record.sbuf();
1085 assert!(!sbuf.is_empty(), "Expected non-empty sequence buffer");
1086
1087 let slen = record.slen();
1088 assert!(slen > 0, "Expected non-zero sequence length");
1089 }
1090 }
1091
1092 #[test]
1093 fn test_record_quality_data() {
1094 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1095
1096 if let Ok(record) = reader.get(0) {
1097 let squal = record.squal();
1098 let slen = record.slen() as usize;
1099 assert_eq!(
1100 squal.len(),
1101 slen,
1102 "Quality length should match sequence length"
1103 );
1104 }
1105 }
1106
1107 // ==================== Default Quality Score Tests ====================
1108
1109 #[test]
1110 fn test_set_default_quality_score() {
1111 let mut reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1112 let custom_score = 42u8;
1113
1114 reader.set_default_quality_score(custom_score);
1115
1116 if let Ok(record) = reader.get(0) {
1117 let squal = record.squal();
1118 // All quality scores should be the custom score
1119 assert!(
1120 squal.iter().all(|&q| q == custom_score),
1121 "All quality scores should be {}",
1122 custom_score
1123 );
1124 }
1125 }
1126
1127 // ==================== Parallel Processing Tests ====================
1128
1129 #[derive(Clone)]
1130 struct CountingProcessor {
1131 count: Arc<std::sync::Mutex<usize>>,
1132 }
1133
1134 impl ParallelProcessor for CountingProcessor {
1135 fn process_record<R: BinseqRecord>(&mut self, _record: R) -> Result<()> {
1136 let mut count = self.count.lock().unwrap();
1137 *count += 1;
1138 Ok(())
1139 }
1140 }
1141
1142 #[test]
1143 fn test_parallel_processing() {
1144 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1145 let num_records = reader.num_records();
1146
1147 let count = Arc::new(std::sync::Mutex::new(0));
1148 let processor = CountingProcessor {
1149 count: count.clone(),
1150 };
1151
1152 reader.process_parallel(processor, 2).unwrap();
1153
1154 let final_count = *count.lock().unwrap();
1155 assert_eq!(final_count, num_records, "All records should be processed");
1156 }
1157
1158 #[test]
1159 fn test_parallel_processing_range() {
1160 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1161 let num_records = reader.num_records();
1162
1163 if num_records >= 100 {
1164 let start = 10;
1165 let end = 50;
1166 let expected_count = end - start;
1167
1168 let count = Arc::new(std::sync::Mutex::new(0));
1169 let processor = CountingProcessor {
1170 count: count.clone(),
1171 };
1172
1173 reader
1174 .process_parallel_range(processor, 2, start..end)
1175 .unwrap();
1176
1177 let final_count = *count.lock().unwrap();
1178 assert_eq!(
1179 final_count, expected_count,
1180 "Should process exactly {} records",
1181 expected_count
1182 );
1183 }
1184 }
1185
1186 // ==================== RecordConfig Tests ====================
1187
1188 #[test]
1189 fn test_record_config_from_header() {
1190 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1191 let header = reader.header();
1192 let config = RecordConfig::from_header(&header);
1193
1194 assert_eq!(config.slen, header.slen as u64, "Sequence length mismatch");
1195 assert_eq!(config.xlen, header.xlen as u64, "Extended length mismatch");
1196 assert_eq!(config.bitsize, header.bits, "Bit size mismatch");
1197 }
1198
1199 #[test]
1200 fn test_record_config_record_size() {
1201 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1202 let header = reader.header();
1203 let config = RecordConfig::from_header(&header);
1204
1205 let size_u64 = config.record_size_u64();
1206 assert!(size_u64 > 0, "Record size should be non-zero");
1207
1208 let size_bytes = config.record_size_bytes();
1209 assert_eq!(size_bytes, size_u64 * 8, "Byte size should be 8x u64 size");
1210 }
1211
1212 // ==================== RefRecord Tests ====================
1213
1214 #[test]
1215 fn test_ref_record_bitsize() {
1216 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1217
1218 if let Ok(record) = reader.get(0) {
1219 let bitsize = record.bitsize();
1220 assert!(
1221 matches!(bitsize, BitSize::Two | BitSize::Four),
1222 "Bitsize should be Two or Four"
1223 );
1224 }
1225 }
1226
1227 #[test]
1228 fn test_ref_record_flag() {
1229 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1230
1231 if let Ok(record) = reader.get(0) {
1232 let flag = record.flag();
1233 // Flag should be Some if header has flags enabled
1234 assert!(flag.is_some() || flag.is_none()); // Tests method works
1235 }
1236 }
1237
1238 #[test]
1239 fn test_ref_record_paired_data() {
1240 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1241
1242 if reader.is_paired() {
1243 if let Ok(record) = reader.get(0) {
1244 let xbuf = record.xbuf();
1245 let xlen = record.xlen();
1246
1247 if xlen > 0 {
1248 assert!(
1249 !xbuf.is_empty(),
1250 "Extended buffer should not be empty for paired"
1251 );
1252 }
1253 }
1254 }
1255 }
1256
1257 // ==================== Error Handling Tests ====================
1258
1259 #[test]
1260 fn test_nonexistent_file() {
1261 let result = MmapReader::new("./data/nonexistent.bq");
1262 assert!(result.is_err(), "Should fail on nonexistent file");
1263 }
1264
1265 #[test]
1266 fn test_invalid_file_format() {
1267 // Try to open a non-BQ file as BQ (use Cargo.toml for example)
1268 let result = MmapReader::new("./Cargo.toml");
1269 // This should either fail to open or fail validation
1270 if let Ok(reader) = result {
1271 // If it opens, try to access records (should fail or have issues)
1272 let num_records = reader.num_records();
1273 // The number might be nonsensical for invalid data
1274 let _ = num_records; // Just verify it doesn't panic
1275 }
1276 }
1277
1278 // ==================== Multiple Records Tests ====================
1279
1280 #[test]
1281 fn test_sequential_record_access() {
1282 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1283 let num_records = reader.num_records().min(10);
1284
1285 for i in 0..num_records {
1286 let record = reader.get(i);
1287 assert!(record.is_ok(), "Should get record at index {}", i);
1288 assert_eq!(
1289 record.unwrap().index() as usize,
1290 i,
1291 "Record index mismatch at {}",
1292 i
1293 );
1294 }
1295 }
1296
1297 #[test]
1298 fn test_random_record_access() {
1299 let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1300 let num_records = reader.num_records();
1301
1302 if num_records > 10 {
1303 let indices = [0, 5, num_records / 2, num_records - 1];
1304
1305 for &idx in &indices {
1306 let record = reader.get(idx);
1307 assert!(record.is_ok(), "Should get record at index {}", idx);
1308 assert_eq!(record.unwrap().index() as usize, idx);
1309 }
1310 }
1311 }
1312}