binseq/bq/
reader.rs

1//! Binary sequence reader module
2//!
3//! This module provides functionality for reading binary sequence files using either:
4//! 1. Memory mapping for efficient access to entire files
5//! 2. Streaming for processing data as it arrives
6//!
7//! It supports both sequential and parallel processing of records,
8//! with configurable record layouts for different sequence types.
9
10use std::fs::File;
11use std::io::Read;
12use std::ops::Range;
13use std::path::Path;
14use std::sync::Arc;
15
16use bitnuc::BitSize;
17use bytemuck::cast_slice;
18use memmap2::Mmap;
19
20use super::header::{FileHeader, SIZE_HEADER};
21use crate::{
22    BinseqRecord, DEFAULT_QUALITY_SCORE, Error, ParallelProcessor, ParallelReader,
23    error::{ReadError, Result},
24};
25
26/// A reference to a binary sequence record in a memory-mapped file
27///
28/// This struct provides a view into a single record within a binary sequence file,
29/// allowing access to the record's components (sequence data, flags, etc.) without
30/// copying the data from the memory-mapped file.
31///
32/// The record's data is stored in a compact binary format where:
33/// - The first u64 contains flags
34/// - Subsequent u64s contain the primary sequence data
35/// - If present, final u64s contain the extended sequence data
36#[derive(Clone, Copy)]
37pub struct RefRecord<'a> {
38    /// The position (index) of this record in the file (0-based record index, not byte offset)
39    id: u64,
40    /// The underlying u64 buffer representing the record's binary data
41    buffer: &'a [u64],
42    /// Reusable default quality buffer
43    qbuf: &'a [u8],
44    /// The configuration that defines the layout and size of record components
45    config: RecordConfig,
46    /// Cached index string for the sequence header
47    header_buf: [u8; 20],
48    /// Length of the header in bytes
49    header_len: usize,
50}
51impl<'a> RefRecord<'a> {
52    /// Creates a new record reference
53    ///
54    /// # Arguments
55    ///
56    /// * `id` - The record's position in the file (0-based record index, not byte offset)
57    /// * `buffer` - The u64 slice containing the record's binary data
58    /// * `config` - Configuration defining the record's layout
59    ///
60    /// # Panics
61    ///
62    /// Panics if the buffer length doesn't match the expected size from the config
63    #[must_use]
64    pub fn new(id: u64, buffer: &'a [u64], qbuf: &'a [u8], config: RecordConfig) -> Self {
65        assert_eq!(buffer.len(), config.record_size_u64());
66        Self {
67            id,
68            buffer,
69            qbuf,
70            config,
71            header_buf: [0; 20],
72            header_len: 0,
73        }
74    }
75    /// Returns the record's configuration
76    ///
77    /// The configuration defines the layout and size of the record's components.
78    #[must_use]
79    pub fn config(&self) -> RecordConfig {
80        self.config
81    }
82
83    pub fn set_id(&mut self, id: &[u8]) {
84        self.header_len = id.len();
85        self.header_buf[..self.header_len].copy_from_slice(id);
86    }
87}
88
89impl BinseqRecord for RefRecord<'_> {
90    fn bitsize(&self) -> BitSize {
91        self.config.bitsize
92    }
93    fn index(&self) -> u64 {
94        self.id
95    }
96    /// Clear the buffer and fill it with the sequence header
97    fn sheader(&self) -> &[u8] {
98        &self.header_buf[..self.header_len]
99    }
100
101    /// Clear the buffer and fill it with the extended header
102    fn xheader(&self) -> &[u8] {
103        self.sheader()
104    }
105
106    fn flag(&self) -> Option<u64> {
107        if self.config.flags {
108            Some(self.buffer[0])
109        } else {
110            None
111        }
112    }
113    fn slen(&self) -> u64 {
114        self.config.slen
115    }
116    fn xlen(&self) -> u64 {
117        self.config.xlen
118    }
119    fn sbuf(&self) -> &[u64] {
120        if self.config.flags {
121            &self.buffer[1..=(self.config.schunk as usize)]
122        } else {
123            &self.buffer[..(self.config.schunk as usize)]
124        }
125    }
126    fn xbuf(&self) -> &[u64] {
127        if self.config.flags {
128            &self.buffer[1 + self.config.schunk as usize..]
129        } else {
130            &self.buffer[self.config.schunk as usize..]
131        }
132    }
133    fn squal(&self) -> &[u8] {
134        &self.qbuf[..self.config.slen as usize]
135    }
136    fn xqual(&self) -> &[u8] {
137        &self.qbuf[..self.config.xlen as usize]
138    }
139}
140
141/// A reference to a record in the map with a precomputed decoded buffer slice
142pub struct BatchRecord<'a> {
143    /// Unprocessed buffer slice (with flags)
144    buffer: &'a [u64],
145    /// Decoded buffer slice
146    dbuf: &'a [u8],
147    /// Record ID
148    id: u64,
149    /// The configuration that defines the layout and size of record components
150    config: RecordConfig,
151    /// A reusable pre-initialized quality score buffer
152    qbuf: &'a [u8],
153    /// Cached index string for the sequence header
154    header_buf: [u8; 20],
155    /// Length of the header in bytes
156    header_len: usize,
157}
158impl BinseqRecord for BatchRecord<'_> {
159    fn bitsize(&self) -> BitSize {
160        self.config.bitsize
161    }
162    fn index(&self) -> u64 {
163        self.id
164    }
165    /// Clear the buffer and fill it with the sequence header
166    fn sheader(&self) -> &[u8] {
167        &self.header_buf[..self.header_len]
168    }
169
170    /// Clear the buffer and fill it with the extended header
171    fn xheader(&self) -> &[u8] {
172        self.sheader()
173    }
174
175    fn flag(&self) -> Option<u64> {
176        if self.config.flags {
177            Some(self.buffer[0])
178        } else {
179            None
180        }
181    }
182    fn slen(&self) -> u64 {
183        self.config.slen
184    }
185    fn xlen(&self) -> u64 {
186        self.config.xlen
187    }
188    fn sbuf(&self) -> &[u64] {
189        if self.config.flags {
190            &self.buffer[1..=(self.config.schunk as usize)]
191        } else {
192            &self.buffer[..(self.config.schunk as usize)]
193        }
194    }
195    fn xbuf(&self) -> &[u64] {
196        if self.config.flags {
197            &self.buffer[1 + self.config.schunk as usize..]
198        } else {
199            &self.buffer[self.config.schunk as usize..]
200        }
201    }
202    fn decode_s(&self, dbuf: &mut Vec<u8>) -> Result<()> {
203        dbuf.extend_from_slice(self.sseq());
204        Ok(())
205    }
206    fn decode_x(&self, dbuf: &mut Vec<u8>) -> Result<()> {
207        dbuf.extend_from_slice(self.xseq());
208        Ok(())
209    }
210    /// Override this method since we can make use of block information
211    fn sseq(&self) -> &[u8] {
212        let scalar = self.config.scalar();
213        let mut lbound = 0;
214        let mut rbound = self.config.slen();
215        if self.config.flags {
216            lbound += scalar;
217            rbound += scalar;
218        }
219        &self.dbuf[lbound..rbound]
220    }
221    /// Override this method since we can make use of block information
222    fn xseq(&self) -> &[u8] {
223        let scalar = self.config.scalar();
224        let mut lbound = scalar * self.config.schunk();
225        let mut rbound = lbound + self.config.xlen();
226        if self.config.flags {
227            lbound += scalar;
228            rbound += scalar;
229        }
230        &self.dbuf[lbound..rbound]
231    }
232    fn squal(&self) -> &[u8] {
233        &self.qbuf[..self.config.slen()]
234    }
235    fn xqual(&self) -> &[u8] {
236        &self.qbuf[..self.config.xlen()]
237    }
238}
239
240/// Configuration for binary sequence record layout
241///
242/// This struct defines the size and layout of binary sequence records,
243/// including both primary sequence data and optional extended data.
244/// It handles the translation between sequence lengths in base pairs
245/// and the number of u64 chunks needed to store the compressed data.
246#[derive(Clone, Copy)]
247pub struct RecordConfig {
248    /// The primary sequence length in base pairs
249    slen: u64,
250    /// The extended sequence length in base pairs
251    xlen: u64,
252    /// The number of u64 chunks needed to store the primary sequence
253    /// (each u64 stores 32 nucleotides)
254    schunk: u64,
255    /// The number of u64 chunks needed to store the extended sequence
256    /// (each u64 stores 32 values)
257    xchunk: u64,
258    /// The bitsize of the record
259    bitsize: BitSize,
260    /// Whether flags are present
261    flags: bool,
262}
263impl RecordConfig {
264    /// Creates a new record configuration
265    ///
266    /// This constructor initializes a configuration for a binary sequence record
267    /// with specified primary and extended sequence lengths.
268    ///
269    /// # Arguments
270    ///
271    /// * `slen` - The length of primary sequences in the file
272    /// * `xlen` - The length of secondary/extended sequences in the file
273    /// * `bitsize` - The bitsize of the record
274    /// * `flags` - Whether flags are present
275    ///
276    /// # Returns
277    ///
278    /// A new `RecordConfig` instance with the specified sequence lengths
279    pub fn new(slen: usize, xlen: usize, bitsize: BitSize, flags: bool) -> Self {
280        let (schunk, xchunk) = match bitsize {
281            BitSize::Two => (slen.div_ceil(32), xlen.div_ceil(32)),
282            BitSize::Four => (slen.div_ceil(16), xlen.div_ceil(16)),
283        };
284        Self {
285            slen: slen as u64,
286            xlen: xlen as u64,
287            schunk: schunk as u64,
288            xchunk: xchunk as u64,
289            bitsize,
290            flags,
291        }
292    }
293
294    /// Creates a new record configuration from a header
295    ///
296    /// This constructor initializes a configuration based on a header that contains
297    /// the sequence lengths for primary and extended sequences.
298    ///
299    /// # Arguments
300    ///
301    /// * `header` - A reference to a `FileHeader` containing sequence lengths
302    ///
303    /// # Returns
304    ///
305    /// A new `RecordConfig` instance with the sequence lengths from the header
306    pub fn from_header(header: &FileHeader) -> Self {
307        Self::new(
308            header.slen as usize,
309            header.xlen as usize,
310            header.bits,
311            header.flags,
312        )
313    }
314
315    /// Returns whether this record contains extended sequence data
316    ///
317    /// A record is considered paired if it has a non-zero extended sequence length.
318    pub fn paired(&self) -> bool {
319        self.xlen > 0
320    }
321
322    /// Returns the primary sequence length in base pairs
323    ///
324    /// This method returns the length of the primary sequence in base pairs.
325    pub fn slen(&self) -> usize {
326        self.slen as usize
327    }
328
329    /// Returns the extended sequence length in base pairs
330    ///
331    /// This method returns the length of the extended sequence in base pairs.
332    pub fn xlen(&self) -> usize {
333        self.xlen as usize
334    }
335
336    /// Returns the number of u64 chunks needed to store the primary sequence
337    ///
338    /// This method returns the number of u64 chunks required to store the primary
339    /// sequence, where each u64 stores 32 nucleotides.
340    pub fn schunk(&self) -> usize {
341        self.schunk as usize
342    }
343
344    /// Returns the number of u64 chunks needed to store the extended sequence
345    ///
346    /// This method returns the number of u64 chunks required to store the extended
347    /// sequence, where each u64 stores 32 values.
348    pub fn xchunk(&self) -> usize {
349        self.xchunk as usize
350    }
351
352    /// Returns the full record size in bytes (u8):
353    /// 8 * (schunk + xchunk + 1 (flag))
354    pub fn record_size_bytes(&self) -> usize {
355        8 * self.record_size_u64()
356    }
357
358    /// Returns the full record size in u64
359    /// schunk + xchunk + 1 (flag)
360    pub fn record_size_u64(&self) -> usize {
361        if self.flags {
362            (self.schunk + self.xchunk + 1) as usize
363        } else {
364            (self.schunk + self.xchunk) as usize
365        }
366    }
367
368    /// The number of nucleotides per word
369    pub fn scalar(&self) -> usize {
370        match self.bitsize {
371            BitSize::Two => 32,
372            BitSize::Four => 16,
373        }
374    }
375}
376
377/// A memory-mapped reader for binary sequence files
378///
379/// This reader provides efficient access to binary sequence files by memory-mapping
380/// them instead of performing traditional I/O operations. It supports both
381/// sequential access to individual records and parallel processing of records
382/// across multiple threads.
383///
384/// The reader ensures thread-safety through the use of `Arc` for sharing the
385/// memory-mapped data between threads.
386///
387/// Records are returned as [`RefRecord`] which implement the [`BinseqRecord`] trait.
388///
389/// # Examples
390///
391/// ```
392/// use binseq::bq::MmapReader;
393/// use binseq::Result;
394///
395/// fn main() -> Result<()> {
396///     let path = "./data/subset.bq";
397///     let reader = MmapReader::new(path)?;
398///
399///     // Calculate the number of records in the file
400///     let num_records = reader.num_records();
401///     println!("Number of records: {}", num_records);
402///
403///     // Get the record at index 20 (0-indexed)
404///     let record = reader.get(20)?;
405///
406///     Ok(())
407/// }
408/// ```
409pub struct MmapReader {
410    /// Memory mapped file contents, wrapped in Arc for thread-safe sharing
411    mmap: Arc<Mmap>,
412
413    /// Binary sequence file header containing format information
414    header: FileHeader,
415
416    /// Configuration defining the layout of records in the file
417    config: RecordConfig,
418
419    /// Reusable buffer for quality scores
420    qbuf: Vec<u8>,
421
422    /// Default quality score for records without quality scores
423    default_quality_score: u8,
424}
425
426impl MmapReader {
427    /// Creates a new memory-mapped reader for a binary sequence file
428    ///
429    /// This method opens the file, memory-maps its contents, and validates
430    /// the file structure to ensure it contains valid binary sequence data.
431    ///
432    /// # Arguments
433    ///
434    /// * `path` - Path to the binary sequence file
435    ///
436    /// # Returns
437    ///
438    /// * `Ok(MmapReader)` - A new reader if the file is valid
439    /// * `Err(Error)` - If the file is invalid or cannot be opened
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if:
444    /// * The file cannot be opened
445    /// * The file is not a regular file
446    /// * The file header is invalid
447    /// * The file size doesn't match the expected size based on the header
448    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
449        // Verify input file is a file before attempting to map
450        let file = File::open(path)?;
451        if !file.metadata()?.is_file() {
452            return Err(ReadError::IncompatibleFile.into());
453        }
454
455        // Safety: the file is open and won't be modified while mapped
456        let mmap = unsafe { Mmap::map(&file)? };
457
458        // Read header from mapped memory
459        let header = FileHeader::from_buffer(&mmap)?;
460
461        // Record configuraration
462        let config = RecordConfig::from_header(&header);
463
464        // Immediately validate the size of the file against the expected byte size of records
465        if !(mmap.len() - SIZE_HEADER).is_multiple_of(config.record_size_bytes()) {
466            return Err(ReadError::FileTruncation(mmap.len()).into());
467        }
468
469        // preinitialize quality buffer
470        let qbuf = vec![DEFAULT_QUALITY_SCORE; header.slen.max(header.xlen) as usize];
471
472        Ok(Self {
473            mmap: Arc::new(mmap),
474            header,
475            config,
476            qbuf,
477            default_quality_score: DEFAULT_QUALITY_SCORE,
478        })
479    }
480
481    /// Returns the total number of records in the file
482    ///
483    /// This is calculated by subtracting the header size from the total file size
484    /// and dividing by the size of each record.
485    #[must_use]
486    pub fn num_records(&self) -> usize {
487        (self.mmap.len() - SIZE_HEADER) / self.config.record_size_bytes()
488    }
489
490    /// Returns a copy of the binary sequence file header
491    ///
492    /// The header contains format information and sequence length specifications.
493    #[must_use]
494    pub fn header(&self) -> FileHeader {
495        self.header
496    }
497
498    /// Checks if the file has paired-records
499    #[must_use]
500    pub fn is_paired(&self) -> bool {
501        self.header.is_paired()
502    }
503
504    /// Sets the default quality score for records without quality information
505    pub fn set_default_quality_score(&mut self, score: u8) {
506        self.default_quality_score = score;
507        self.qbuf = self.build_qbuf();
508    }
509
510    /// Creates a new quality score buffer
511    #[must_use]
512    pub fn build_qbuf(&self) -> Vec<u8> {
513        vec![self.default_quality_score; self.header.slen.max(self.header.xlen) as usize]
514    }
515
516    /// Returns a reference to a specific record
517    ///
518    /// # Arguments
519    ///
520    /// * `idx` - The index of the record to retrieve (0-based)
521    ///
522    /// # Returns
523    ///
524    /// * `Ok(RefRecord)` - A reference to the requested record
525    /// * `Err(Error)` - If the index is out of bounds
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if the requested index is beyond the number of records in the file
530    pub fn get(&self, idx: usize) -> Result<RefRecord<'_>> {
531        if idx > self.num_records() {
532            return Err(ReadError::OutOfRange {
533                requested_index: idx,
534                max_index: self.num_records(),
535            }
536            .into());
537        }
538        let rsize = self.config.record_size_bytes();
539        let lbound = SIZE_HEADER + (idx * rsize);
540        let rbound = lbound + rsize;
541        let bytes = &self.mmap[lbound..rbound];
542        let buffer = cast_slice(bytes);
543        Ok(RefRecord::new(idx as u64, buffer, &self.qbuf, self.config))
544    }
545
546    /// Returns a slice of the buffer containing the underlying u64 for that range
547    /// of records.
548    ///
549    /// Note: range 10..40 will return all u64s in the mmap between the record index 10 and 40
550    pub fn get_buffer_slice(&self, range: Range<usize>) -> Result<&[u64]> {
551        if range.end > self.num_records() {
552            return Err(ReadError::OutOfRange {
553                requested_index: range.end,
554                max_index: self.num_records(),
555            }
556            .into());
557        }
558        let rsize = self.config.record_size_bytes();
559        let total_records = range.end - range.start;
560        let lbound = SIZE_HEADER + (range.start * rsize);
561        let rbound = lbound + (total_records * rsize);
562        let bytes = &self.mmap[lbound..rbound];
563        let buffer = cast_slice(bytes);
564        Ok(buffer)
565    }
566}
567
568/// A reader for streaming binary sequence data from any source that implements Read
569///
570/// Unlike `MmapReader` which requires the entire file to be accessible at once,
571/// `StreamReader` processes data as it becomes available, making it suitable for:
572/// - Processing data as it arrives over a network
573/// - Handling very large files that exceed available memory
574/// - Pipeline processing where data is flowing continuously
575///
576/// The reader maintains an internal buffer and can handle partial record reconstruction
577/// across chunk boundaries.
578pub struct StreamReader<R: Read> {
579    /// The source reader for binary sequence data
580    reader: R,
581
582    /// Binary sequence file header containing format information
583    header: Option<FileHeader>,
584
585    /// Configuration defining the layout of records in the file
586    config: Option<RecordConfig>,
587
588    /// Buffer for storing incoming data
589    buffer: Vec<u8>,
590
591    /// Buffer for reusable quality scores
592    qbuf: Vec<u8>,
593
594    /// Default quality score for records without quality information
595    default_quality_score: u8,
596
597    /// Current position in the buffer
598    buffer_pos: usize,
599
600    /// Length of valid data in the buffer
601    buffer_len: usize,
602}
603
604impl<R: Read> StreamReader<R> {
605    /// Creates a new `StreamReader` with the default buffer size
606    ///
607    /// This constructor initializes a `StreamReader` that will read from the provided
608    /// source, using an 8K default buffer size.
609    ///
610    /// # Arguments
611    ///
612    /// * `reader` - The source to read binary sequence data from
613    ///
614    /// # Returns
615    ///
616    /// A new `StreamReader` instance
617    pub fn new(reader: R) -> Self {
618        Self::with_capacity(reader, 8192)
619    }
620
621    /// Creates a new `StreamReader` with a specified buffer capacity
622    ///
623    /// This constructor initializes a `StreamReader` with a custom buffer size,
624    /// which can be tuned based on the expected usage pattern.
625    ///
626    /// # Arguments
627    ///
628    /// * `reader` - The source to read binary sequence data from
629    /// * `capacity` - The size of the internal buffer in bytes
630    ///
631    /// # Returns
632    ///
633    /// A new `StreamReader` instance with the specified buffer capacity
634    pub fn with_capacity(reader: R, capacity: usize) -> Self {
635        Self {
636            reader,
637            header: None,
638            config: None,
639            buffer: vec![0; capacity],
640            qbuf: vec![0; capacity],
641            buffer_pos: 0,
642            buffer_len: 0,
643            default_quality_score: DEFAULT_QUALITY_SCORE,
644        }
645    }
646
647    /// Sets the default quality score for records without quality information
648    pub fn set_default_quality_score(&mut self, score: u8) {
649        if score != self.default_quality_score {
650            self.qbuf.clear();
651        }
652        self.default_quality_score = score;
653    }
654
655    /// Reads and validates the header from the underlying reader
656    ///
657    /// This method reads the binary sequence file header and validates it.
658    /// It caches the header internally for future use.
659    ///
660    /// # Returns
661    ///
662    /// * `Ok(&FileHeader)` - A reference to the validated header
663    /// * `Err(Error)` - If reading or validating the header fails
664    ///
665    /// # Panics
666    ///
667    /// Panics if the header is missing when expected in the stream.
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if:
672    /// * There is an I/O error when reading from the source
673    /// * The header data is invalid
674    /// * End of stream is reached before the full header can be read
675    pub fn read_header(&mut self) -> Result<&FileHeader> {
676        if self.header.is_some() {
677            return Ok(self
678                .header
679                .as_ref()
680                .expect("Missing header when expected in stream"));
681        }
682
683        // Ensure we have enough data for the header
684        while self.buffer_len - self.buffer_pos < SIZE_HEADER {
685            self.fill_buffer()?;
686        }
687
688        // Parse header
689        let header_slice = &self.buffer[self.buffer_pos..self.buffer_pos + SIZE_HEADER];
690        let header = FileHeader::from_buffer(header_slice)?;
691
692        self.header = Some(header);
693        self.config = Some(RecordConfig::from_header(&header));
694        self.buffer_pos += SIZE_HEADER;
695
696        Ok(self.header.as_ref().unwrap())
697    }
698
699    /// Fills the internal buffer with more data from the reader
700    ///
701    /// This method reads more data from the underlying reader, handling
702    /// the case where some unprocessed data remains in the buffer.
703    ///
704    /// # Returns
705    ///
706    /// * `Ok(())` - If the buffer was successfully filled with new data
707    /// * `Err(Error)` - If reading from the source fails
708    ///
709    /// # Errors
710    ///
711    /// Returns an error if:
712    /// * There is an I/O error when reading from the source
713    /// * End of stream is reached (no more data available)
714    fn fill_buffer(&mut self) -> Result<()> {
715        // Move remaining data to beginning of buffer if needed
716        if self.buffer_pos > 0 && self.buffer_pos < self.buffer_len {
717            self.buffer.copy_within(self.buffer_pos..self.buffer_len, 0);
718            self.buffer_len -= self.buffer_pos;
719            self.buffer_pos = 0;
720        } else if self.buffer_pos == self.buffer_len {
721            self.buffer_len = 0;
722            self.buffer_pos = 0;
723        }
724
725        // Read more data
726        let bytes_read = self.reader.read(&mut self.buffer[self.buffer_len..])?;
727        if bytes_read == 0 {
728            return Err(ReadError::EndOfStream.into());
729        }
730
731        self.buffer_len += bytes_read;
732        Ok(())
733    }
734
735    /// Retrieves the next record from the stream
736    ///
737    /// This method reads and processes the next complete record from the stream.
738    /// It handles the case where a record spans multiple buffer fills.
739    ///
740    /// # Returns
741    ///
742    /// * `Ok(Some(RefRecord))` - The next record was successfully read
743    /// * `Ok(None)` - End of stream was reached (no more records)
744    /// * `Err(Error)` - If an error occurred during reading
745    ///
746    /// # Panics
747    ///
748    /// Panics if the configuration is missing when expected in the stream.
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if:
753    /// * There is an I/O error when reading from the source
754    /// * The header has not been read yet
755    /// * The data format is invalid
756    pub fn next_record(&mut self) -> Option<Result<RefRecord<'_>>> {
757        // Ensure header is read
758        if self.header.is_none()
759            && let Some(e) = self.read_header().err()
760        {
761            return Some(Err(e));
762        }
763
764        let config = self
765            .config
766            .expect("Missing configuration when expected in stream");
767        let record_size = config.record_size_bytes();
768
769        // Ensure we have enough data for a complete record
770        while self.buffer_len - self.buffer_pos < record_size {
771            match self.fill_buffer() {
772                Ok(()) => {}
773                Err(Error::ReadError(ReadError::EndOfStream)) => {
774                    // End of stream reached - if we have any partial data, it's an error
775                    if self.buffer_len - self.buffer_pos > 0 {
776                        return Some(Err(ReadError::PartialRecord(
777                            self.buffer_len - self.buffer_pos,
778                        )
779                        .into()));
780                    }
781                    return None;
782                }
783                Err(e) => return Some(Err(e)),
784            }
785        }
786
787        // Process record
788        let record_start = self.buffer_pos;
789        self.buffer_pos += record_size;
790
791        let record_bytes = &self.buffer[record_start..record_start + record_size];
792        let record_u64s = cast_slice(record_bytes);
793
794        // update quality score buffer if necessary
795        if self.qbuf.is_empty() {
796            let max_size = config.slen.max(config.xlen) as usize;
797            self.qbuf.resize(max_size, self.default_quality_score);
798        }
799
800        // Create record with incremental ID (based on read position)
801        let id = (record_start - SIZE_HEADER) / record_size;
802        Some(Ok(RefRecord::new(
803            id as u64,
804            record_u64s,
805            &self.qbuf,
806            config,
807        )))
808    }
809
810    /// Consumes the stream reader and returns the inner reader
811    ///
812    /// This method is useful when you need access to the underlying reader
813    /// after processing is complete.
814    ///
815    /// # Returns
816    ///
817    /// The inner reader that was used by this `StreamReader`
818    pub fn into_inner(self) -> R {
819        self.reader
820    }
821}
822
823/// Default batch size for parallel processing
824///
825/// This constant defines how many records each thread processes at a time
826/// during parallel processing operations.
827pub const BATCH_SIZE: usize = 1024;
828
829/// Parallel processing implementation for memory-mapped readers
830impl ParallelReader for MmapReader {
831    /// Processes all records in parallel using multiple threads
832    ///
833    /// This method distributes the records across the specified number of threads
834    /// and processes them using the provided processor. Each thread receives its
835    /// own clone of the processor and processes a contiguous chunk of records.
836    ///
837    /// # Arguments
838    ///
839    /// * `processor` - The processor to use for handling records
840    /// * `num_threads` - The number of threads to use for processing
841    ///
842    /// # Type Parameters
843    ///
844    /// * `P` - A type that implements `ParallelProcessor` and can be cloned
845    ///
846    /// # Returns
847    ///
848    /// * `Ok(())` - If all records were processed successfully
849    /// * `Err(Error)` - If an error occurred during processing
850    fn process_parallel<P: ParallelProcessor + Clone + 'static>(
851        self,
852        processor: P,
853        num_threads: usize,
854    ) -> Result<()> {
855        let num_records = self.num_records();
856        self.process_parallel_range(processor, num_threads, 0..num_records)
857    }
858
859    /// Process records in parallel within a specified range
860    ///
861    /// This method allows parallel processing of a subset of records within the file,
862    /// defined by a start and end index. The range is distributed across the specified
863    /// number of threads.
864    ///
865    /// # Arguments
866    ///
867    /// * `processor` - The processor to use for each record
868    /// * `num_threads` - The number of threads to spawn
869    /// * `range` - The range of record indices to process
870    ///
871    /// # Type Parameters
872    ///
873    /// * `P` - A type that implements `ParallelProcessor` and can be cloned
874    ///
875    /// # Returns
876    ///
877    /// * `Ok(())` - If all records were processed successfully
878    /// * `Err(Error)` - If an error occurred during processing
879    fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
880        self,
881        processor: P,
882        num_threads: usize,
883        range: Range<usize>,
884    ) -> Result<()> {
885        // Calculate the number of threads to use
886        let num_threads = if num_threads == 0 {
887            num_cpus::get()
888        } else {
889            num_threads.min(num_cpus::get())
890        };
891
892        // Validate range
893        let num_records = self.num_records();
894        self.validate_range(num_records, &range)?;
895
896        // Calculate number of records for each thread within the range
897        let range_size = range.end - range.start;
898        let records_per_thread = range_size.div_ceil(num_threads);
899
900        // Arc self
901        let reader = Arc::new(self);
902
903        // Build thread handles
904        let mut handles = Vec::new();
905        for tid in 0..num_threads {
906            let mut processor = processor.clone();
907            let reader = reader.clone();
908            processor.set_tid(tid);
909
910            let handle = std::thread::spawn(move || -> Result<()> {
911                let start_idx = range.start + tid * records_per_thread;
912                let end_idx = (start_idx + records_per_thread).min(range.end);
913
914                if start_idx >= end_idx {
915                    return Ok(()); // No records for this thread
916                }
917
918                // create a reusable buffer for translating record IDs
919                let mut translater = itoa::Buffer::new();
920
921                // initialize a decoding buffer
922                let mut dbuf = Vec::new();
923
924                // initialize a quality score buffer
925                let qbuf = reader.build_qbuf();
926
927                // calculate the size of a record in the cast u64 slice
928                let rsize_u64 = reader.config.record_size_bytes() / 8;
929
930                // determine the required scalar size
931                let scalar = reader.config.scalar();
932
933                // calculate the size of a record in the batch decoded buffer
934                let mut dbuf_rsize = { (reader.config.schunk() + reader.config.xchunk()) * scalar };
935                if reader.config.flags {
936                    dbuf_rsize += scalar;
937                }
938
939                // iterate over the range of indices
940                for range_start in (start_idx..end_idx).step_by(BATCH_SIZE) {
941                    let range_end = (range_start + BATCH_SIZE).min(end_idx);
942
943                    // clear the decoded buffer
944                    dbuf.clear();
945
946                    // get the encoded buffer slice
947                    let ebuf = reader.get_buffer_slice(range_start..range_end)?;
948
949                    // decode the entire buffer at once (with flags and extra bases)
950                    reader
951                        .config
952                        .bitsize
953                        .decode(ebuf, ebuf.len() * scalar, &mut dbuf)?;
954
955                    // iterate over each index in the range
956                    for (inner_idx, idx) in (range_start..range_end).enumerate() {
957                        // translate the index
958                        let id_str = translater.format(idx);
959
960                        // create the index buffer
961                        let mut header_buf = [0; 20];
962                        let header_len = id_str.len();
963                        header_buf[..header_len].copy_from_slice(id_str.as_bytes());
964
965                        // find the buffer starts
966                        let ebuf_start = inner_idx * rsize_u64;
967                        let dbuf_start = inner_idx * dbuf_rsize;
968
969                        // initialize the record
970                        let record = BatchRecord {
971                            buffer: &ebuf[ebuf_start..(ebuf_start + rsize_u64)],
972                            dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize)],
973                            qbuf: &qbuf,
974                            id: idx as u64,
975                            config: reader.config,
976                            header_buf,
977                            header_len,
978                        };
979
980                        // process the record
981                        processor.process_record(record)?;
982                    }
983
984                    // process the batch
985                    processor.on_batch_complete()?;
986                }
987
988                Ok(())
989            });
990
991            handles.push(handle);
992        }
993
994        for handle in handles {
995            handle
996                .join()
997                .expect("Error joining handle (1)")
998                .expect("Error joining handle (2)");
999        }
1000
1001        Ok(())
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008    use crate::BinseqRecord;
1009    use bitnuc::BitSize;
1010
1011    const TEST_BQ_FILE: &str = "./data/subset.bq";
1012
1013    // ==================== MmapReader Basic Tests ====================
1014
1015    #[test]
1016    fn test_mmap_reader_new() {
1017        let reader = MmapReader::new(TEST_BQ_FILE);
1018        assert!(reader.is_ok(), "Failed to create reader");
1019    }
1020
1021    #[test]
1022    fn test_mmap_reader_num_records() {
1023        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1024        let num_records = reader.num_records();
1025        assert!(num_records > 0, "Expected non-zero records");
1026    }
1027
1028    #[test]
1029    fn test_mmap_reader_is_paired() {
1030        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1031        let is_paired = reader.is_paired();
1032        // Test that the method returns a boolean
1033        assert!(is_paired || !is_paired); // Always true, tests the method works
1034    }
1035
1036    #[test]
1037    fn test_mmap_reader_header_access() {
1038        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1039        let header = reader.header();
1040        assert!(header.slen > 0, "Expected non-zero sequence length");
1041    }
1042
1043    #[test]
1044    fn test_mmap_reader_config_access() {
1045        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1046        let header = reader.header();
1047        let config = RecordConfig::from_header(&header);
1048        assert!(
1049            config.slen > 0,
1050            "Expected non-zero sequence length in config"
1051        );
1052    }
1053
1054    // ==================== Record Access Tests ====================
1055
1056    #[test]
1057    fn test_get_record() {
1058        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1059        let num_records = reader.num_records();
1060
1061        if num_records > 0 {
1062            let record = reader.get(0);
1063            assert!(record.is_ok(), "Expected to get first record");
1064
1065            let record = record.unwrap();
1066            assert_eq!(record.index(), 0, "Expected record index to be 0");
1067        }
1068    }
1069
1070    #[test]
1071    fn test_get_record_out_of_bounds() {
1072        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1073        let num_records = reader.num_records();
1074
1075        let record = reader.get(num_records + 100);
1076        assert!(record.is_err(), "Expected error for out of bounds index");
1077    }
1078
1079    #[test]
1080    fn test_record_sequence_data() {
1081        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1082
1083        if let Ok(record) = reader.get(0) {
1084            let sbuf = record.sbuf();
1085            assert!(!sbuf.is_empty(), "Expected non-empty sequence buffer");
1086
1087            let slen = record.slen();
1088            assert!(slen > 0, "Expected non-zero sequence length");
1089        }
1090    }
1091
1092    #[test]
1093    fn test_record_quality_data() {
1094        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1095
1096        if let Ok(record) = reader.get(0) {
1097            let squal = record.squal();
1098            let slen = record.slen() as usize;
1099            assert_eq!(
1100                squal.len(),
1101                slen,
1102                "Quality length should match sequence length"
1103            );
1104        }
1105    }
1106
1107    // ==================== Default Quality Score Tests ====================
1108
1109    #[test]
1110    fn test_set_default_quality_score() {
1111        let mut reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1112        let custom_score = 42u8;
1113
1114        reader.set_default_quality_score(custom_score);
1115
1116        if let Ok(record) = reader.get(0) {
1117            let squal = record.squal();
1118            // All quality scores should be the custom score
1119            assert!(
1120                squal.iter().all(|&q| q == custom_score),
1121                "All quality scores should be {}",
1122                custom_score
1123            );
1124        }
1125    }
1126
1127    // ==================== Parallel Processing Tests ====================
1128
1129    #[derive(Clone)]
1130    struct CountingProcessor {
1131        count: Arc<std::sync::Mutex<usize>>,
1132    }
1133
1134    impl ParallelProcessor for CountingProcessor {
1135        fn process_record<R: BinseqRecord>(&mut self, _record: R) -> Result<()> {
1136            let mut count = self.count.lock().unwrap();
1137            *count += 1;
1138            Ok(())
1139        }
1140    }
1141
1142    #[test]
1143    fn test_parallel_processing() {
1144        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1145        let num_records = reader.num_records();
1146
1147        let count = Arc::new(std::sync::Mutex::new(0));
1148        let processor = CountingProcessor {
1149            count: count.clone(),
1150        };
1151
1152        reader.process_parallel(processor, 2).unwrap();
1153
1154        let final_count = *count.lock().unwrap();
1155        assert_eq!(final_count, num_records, "All records should be processed");
1156    }
1157
1158    #[test]
1159    fn test_parallel_processing_range() {
1160        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1161        let num_records = reader.num_records();
1162
1163        if num_records >= 100 {
1164            let start = 10;
1165            let end = 50;
1166            let expected_count = end - start;
1167
1168            let count = Arc::new(std::sync::Mutex::new(0));
1169            let processor = CountingProcessor {
1170                count: count.clone(),
1171            };
1172
1173            reader
1174                .process_parallel_range(processor, 2, start..end)
1175                .unwrap();
1176
1177            let final_count = *count.lock().unwrap();
1178            assert_eq!(
1179                final_count, expected_count,
1180                "Should process exactly {} records",
1181                expected_count
1182            );
1183        }
1184    }
1185
1186    // ==================== RecordConfig Tests ====================
1187
1188    #[test]
1189    fn test_record_config_from_header() {
1190        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1191        let header = reader.header();
1192        let config = RecordConfig::from_header(&header);
1193
1194        assert_eq!(config.slen, header.slen as u64, "Sequence length mismatch");
1195        assert_eq!(config.xlen, header.xlen as u64, "Extended length mismatch");
1196        assert_eq!(config.bitsize, header.bits, "Bit size mismatch");
1197    }
1198
1199    #[test]
1200    fn test_record_config_record_size() {
1201        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1202        let header = reader.header();
1203        let config = RecordConfig::from_header(&header);
1204
1205        let size_u64 = config.record_size_u64();
1206        assert!(size_u64 > 0, "Record size should be non-zero");
1207
1208        let size_bytes = config.record_size_bytes();
1209        assert_eq!(size_bytes, size_u64 * 8, "Byte size should be 8x u64 size");
1210    }
1211
1212    // ==================== RefRecord Tests ====================
1213
1214    #[test]
1215    fn test_ref_record_bitsize() {
1216        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1217
1218        if let Ok(record) = reader.get(0) {
1219            let bitsize = record.bitsize();
1220            assert!(
1221                matches!(bitsize, BitSize::Two | BitSize::Four),
1222                "Bitsize should be Two or Four"
1223            );
1224        }
1225    }
1226
1227    #[test]
1228    fn test_ref_record_flag() {
1229        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1230
1231        if let Ok(record) = reader.get(0) {
1232            let flag = record.flag();
1233            // Flag should be Some if header has flags enabled
1234            assert!(flag.is_some() || flag.is_none()); // Tests method works
1235        }
1236    }
1237
1238    #[test]
1239    fn test_ref_record_paired_data() {
1240        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1241
1242        if reader.is_paired() {
1243            if let Ok(record) = reader.get(0) {
1244                let xbuf = record.xbuf();
1245                let xlen = record.xlen();
1246
1247                if xlen > 0 {
1248                    assert!(
1249                        !xbuf.is_empty(),
1250                        "Extended buffer should not be empty for paired"
1251                    );
1252                }
1253            }
1254        }
1255    }
1256
1257    // ==================== Error Handling Tests ====================
1258
1259    #[test]
1260    fn test_nonexistent_file() {
1261        let result = MmapReader::new("./data/nonexistent.bq");
1262        assert!(result.is_err(), "Should fail on nonexistent file");
1263    }
1264
1265    #[test]
1266    fn test_invalid_file_format() {
1267        // Try to open a non-BQ file as BQ (use Cargo.toml for example)
1268        let result = MmapReader::new("./Cargo.toml");
1269        // This should either fail to open or fail validation
1270        if let Ok(reader) = result {
1271            // If it opens, try to access records (should fail or have issues)
1272            let num_records = reader.num_records();
1273            // The number might be nonsensical for invalid data
1274            let _ = num_records; // Just verify it doesn't panic
1275        }
1276    }
1277
1278    // ==================== Multiple Records Tests ====================
1279
1280    #[test]
1281    fn test_sequential_record_access() {
1282        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1283        let num_records = reader.num_records().min(10);
1284
1285        for i in 0..num_records {
1286            let record = reader.get(i);
1287            assert!(record.is_ok(), "Should get record at index {}", i);
1288            assert_eq!(
1289                record.unwrap().index() as usize,
1290                i,
1291                "Record index mismatch at {}",
1292                i
1293            );
1294        }
1295    }
1296
1297    #[test]
1298    fn test_random_record_access() {
1299        let reader = MmapReader::new(TEST_BQ_FILE).unwrap();
1300        let num_records = reader.num_records();
1301
1302        if num_records > 10 {
1303            let indices = [0, 5, num_records / 2, num_records - 1];
1304
1305            for &idx in &indices {
1306                let record = reader.get(idx);
1307                assert!(record.is_ok(), "Should get record at index {}", idx);
1308                assert_eq!(record.unwrap().index() as usize, idx);
1309            }
1310        }
1311    }
1312}