binseq/vbq/
reader.rs

1//! Reader implementation for VBINSEQ files
2//!
3//! This module provides functionality for reading sequence data from VBINSEQ files,
4//! including support for compressed blocks, quality scores, paired-end reads, and sequence headers.
5//!
6//! ## Format Changes (v0.7.0+)
7//!
8//! - **Embedded Index**: Readers now load the index from within VBQ files instead of separate `.vqi` files
9//! - **Headers Support**: Optional sequence headers/identifiers can be read from each record
10//! - **Multi-bit Encoding**: Support for reading 2-bit and 4-bit nucleotide encodings
11//! - **Extended Capacity**: u64 indexing supports files with more than 4 billion records
12//!
13//! ## Index Loading
14//!
15//! The reader automatically loads the embedded index from the end of VBQ files:
16//! 1. Seeks to the end of the file to read the index trailer
17//! 2. Validates the INDEX_END_MAGIC marker
18//! 3. Reads the index size and decompresses the embedded index
19//! 4. Uses the index for efficient random access and parallel processing
20//!
21//! ## Memory-Mapped Reading
22//!
23//! The `MmapReader` provides efficient access to large files through memory mapping:
24//! - Zero-copy access to file data
25//! - Efficient random access using the embedded index
26//! - Support for parallel processing across record ranges
27//!
28//! ## Example
29//!
30//! ```rust,no_run
31//! use binseq::vbq::MmapReader;
32//! use binseq::BinseqRecord;
33//!
34//! // Open a VBQ file (index is automatically loaded)
35//! let mut reader = MmapReader::new("example.vbq").unwrap();
36//! let mut block = reader.new_block();
37//!
38//! // Read records with headers and quality scores
39//! let mut seq_buffer = Vec::new();
40//! let mut header_buffer = Vec::new();
41//! while reader.read_block_into(&mut block).unwrap() {
42//!     for record in block.iter() {
43//!         record.decode_s(&mut seq_buffer).unwrap();
44//!         record.sheader(&mut header_buffer);
45//!         println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap());
46//!         println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap());
47//!         if !record.squal().is_empty() {
48//!             println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap());
49//!         }
50//!         seq_buffer.clear();
51//!         header_buffer.clear();
52//!     }
53//! }
54//! ```
55
56use std::fs::File;
57use std::io::Read;
58use std::ops::Range;
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61
62use bitnuc::BitSize;
63use byteorder::{ByteOrder, LittleEndian};
64use memmap2::Mmap;
65use zstd::Decoder;
66
67use super::{
68    header::{SIZE_BLOCK_HEADER, SIZE_HEADER},
69    BlockHeader, BlockIndex, BlockRange, VBinseqHeader,
70};
71use crate::vbq::index::{IndexHeader, INDEX_END_MAGIC, INDEX_HEADER_SIZE};
72use crate::{
73    error::{ReadError, Result},
74    BinseqRecord, ParallelProcessor, ParallelReader,
75};
76
77/// Calculates the number of 64-bit words needed to store a nucleotide sequence of the given length
78///
79/// Nucleotides are packed into 64-bit words with 2 bits per nucleotide (32 nucleotides per word).
80/// This function calculates how many 64-bit words are needed to encode a sequence of a given length.
81///
82/// # Parameters
83///
84/// * `len` - Length of the nucleotide sequence in basepairs
85///
86/// # Returns
87///
88/// The number of 64-bit words required to encode the sequence
89fn encoded_sequence_len(len: u64, bitsize: BitSize) -> usize {
90    match bitsize {
91        BitSize::Two => len.div_ceil(32) as usize,
92        BitSize::Four => len.div_ceil(16) as usize,
93    }
94}
95
96/// A container for a block of VBINSEQ records
97///
98/// The `RecordBlock` struct represents a single block of records read from a VBINSEQ file.
99/// It stores the raw data for multiple records in vectors, allowing efficient iteration
100/// over the records without copying memory for each record.
101///
102/// ## Format Support (v0.7.0+)
103///
104/// - Supports reading records with optional sequence headers
105/// - Handles both 2-bit and 4-bit nucleotide encodings
106/// - Supports quality scores and paired sequences
107/// - Compatible with both compressed and uncompressed blocks
108///
109/// The `RecordBlock` is reused when reading blocks sequentially from a file, with its
110/// contents being cleared and replaced with each new block that is read.
111///
112/// # Examples
113///
114/// ```rust,no_run
115/// use binseq::vbq::MmapReader;
116///
117/// let reader = MmapReader::new("example.vbq").unwrap();
118/// let mut block = reader.new_block(); // Create a block with appropriate size
119/// ```
120pub struct RecordBlock {
121    /// Bitsize of the records in the block
122    bitsize: BitSize,
123
124    /// Index of the first record in the block
125    /// This allows records to maintain their global position in the file
126    index: usize,
127
128    /// Buffer containing all record flags in the block
129    /// Each record has one flag value stored at the corresponding position
130    flags: Vec<Option<u64>>,
131
132    /// Buffer containing all sequence lengths in the block
133    /// For each record, two consecutive entries are stored: primary sequence length and extended sequence length
134    lens: Vec<u64>,
135
136    /// Buffer containing all header lengths in the block
137    /// For each record, two consecutive entries are stored: primary header length and extended header length
138    header_lengths: Vec<u64>,
139
140    /// Buffer containing all packed nucleotide sequences in the block
141    /// Nucleotides are encoded as 2-bit values (4 nucleotides per byte)
142    sequences: Vec<u64>,
143
144    /// Buffer containing all quality scores in the block
145    /// Quality scores are stored as raw bytes, one byte per nucleotide
146    qualities: Vec<u8>,
147
148    /// Buffer containing all headers in the block
149    headers: Vec<u8>,
150
151    /// Maximum size of the block in bytes
152    /// This is derived from the file header's block size field
153    block_size: usize,
154
155    /// Reusable buffer for temporary storage during decompression
156    /// Using a reusable buffer reduces memory allocations
157    rbuf: Vec<u8>,
158}
159impl RecordBlock {
160    /// Creates a new empty `RecordBlock` with the specified block size
161    ///
162    /// The block size should match the one specified in the VBINSEQ file header
163    /// for proper operation. This is typically handled automatically when using
164    /// `MmapReader::new_block()`.
165    ///
166    /// # Parameters
167    ///
168    /// * `bitsize` - Bitsize of the records in the block
169    /// * `block_size` - Maximum size of the block in bytes
170    ///
171    /// # Returns
172    ///
173    /// A new empty `RecordBlock` instance
174    #[must_use]
175    pub fn new(bitsize: BitSize, block_size: usize) -> Self {
176        Self {
177            bitsize,
178            index: 0,
179            flags: Vec::new(),
180            lens: Vec::new(),
181            header_lengths: Vec::new(),
182            sequences: Vec::new(),
183            qualities: Vec::new(),
184            headers: Vec::new(),
185            block_size,
186            rbuf: Vec::new(),
187        }
188    }
189
190    /// Returns the number of records in this block
191    ///
192    /// # Returns
193    ///
194    /// The number of records currently stored in this block
195    #[must_use]
196    pub fn n_records(&self) -> usize {
197        self.flags.len()
198    }
199
200    /// Returns an iterator over the records in this block
201    ///
202    /// The iterator yields `RefRecord` instances that provide access to the record data
203    /// without copying the underlying data.
204    ///
205    /// # Returns
206    ///
207    /// An iterator over the records in this block
208    ///
209    /// # Examples
210    ///
211    /// ```rust,no_run
212    /// use binseq::vbq::MmapReader;
213    /// use binseq::BinseqRecord;
214    ///
215    /// let mut reader = MmapReader::new("example.vbq").unwrap();
216    /// let mut block = reader.new_block();
217    /// reader.read_block_into(&mut block).unwrap();
218    ///
219    /// // Iterate over records in the block
220    /// for record in block.iter() {
221    ///     println!("Record {}", record.index());
222    /// }
223    /// ```
224    #[must_use]
225    #[allow(clippy::iter_without_into_iter)]
226    pub fn iter(&self) -> RecordBlockIter<'_> {
227        RecordBlockIter::new(self)
228    }
229
230    /// Updates the starting index of the block
231    ///
232    /// This is used internally to keep track of the global position of records
233    /// within the file, allowing each record to maintain its original index.
234    ///
235    /// # Parameters
236    ///
237    /// * `index` - The index of the first record in the block
238    fn update_index(&mut self, index: usize) {
239        self.index = index;
240    }
241
242    /// Clears all data from the block
243    ///
244    /// This method resets the block to an empty state, clearing all vectors and resetting
245    /// the index to 0. This is typically used when reusing a block for reading a new block
246    /// from a file.
247    pub fn clear(&mut self) {
248        self.index = 0;
249        self.flags.clear();
250        self.lens.clear();
251        self.header_lengths.clear();
252        self.sequences.clear();
253        self.qualities.clear();
254        self.headers.clear();
255    }
256
257    /// Ingest the bytes from a block into the record block
258    ///
259    /// This method takes a slice of bytes and processes it to extract
260    /// the records from the block. It is used when reading a block from
261    /// a file into a record block.
262    ///
263    /// This is a private method used primarily for parallel processing.
264    ///
265    /// # Parameters
266    ///
267    /// * `bytes` - A slice of bytes containing the block data
268    /// * `has_quality` - A boolean indicating whether the block contains quality scores
269    /// * `has_header` - A boolean indicating whether the block contains headers
270    ///
271    /// # Returns
272    ///
273    /// A `Result` indicating success or an error
274    fn ingest_bytes(&mut self, bytes: &[u8], has_quality: bool, has_header: bool, has_flags: bool) {
275        let mut pos = 0;
276        loop {
277            // Read the flag and advance the position
278            let flag = if has_flags {
279                if pos + 24 > bytes.len() {
280                    break;
281                }
282                let flag = LittleEndian::read_u64(&bytes[pos..pos + 8]);
283                pos += 8;
284                Some(flag)
285            } else {
286                if pos + 16 > bytes.len() {
287                    break;
288                }
289                None
290            };
291
292            // Read the primary length and advance the position
293            let slen = LittleEndian::read_u64(&bytes[pos..pos + 8]);
294            pos += 8;
295
296            // Read the extended length and advance the position
297            let xlen = LittleEndian::read_u64(&bytes[pos..pos + 8]);
298            pos += 8;
299
300            // No more records in the block
301            if slen == 0 {
302                // It is possible to end up here if the block is not full
303                // In this case the flag and the length are both zero
304                // and effectively blank but initialized memory.
305                break;
306            }
307
308            // Add the record to the block
309            self.flags.push(flag);
310            self.lens.push(slen);
311            self.lens.push(xlen);
312
313            // Add the primary sequence to the block
314            let mut seq = [0u8; 8];
315            for _ in 0..encoded_sequence_len(slen, self.bitsize) {
316                seq.copy_from_slice(&bytes[pos..pos + 8]);
317                self.sequences.push(LittleEndian::read_u64(&seq));
318                pos += 8;
319            }
320
321            // Add the primary quality score to the block
322            if has_quality {
323                let qual_buffer = &bytes[pos..pos + slen as usize];
324                self.qualities.extend_from_slice(qual_buffer);
325                pos += slen as usize;
326            }
327
328            // Add the primary header to the block
329            if has_header {
330                let s_header_length = LittleEndian::read_u64(&bytes[pos..pos + 8]);
331                self.header_lengths.push(s_header_length);
332                pos += 8; // Fixed: advance by 8 bytes for u64
333
334                let s_header_buffer = &bytes[pos..pos + s_header_length as usize];
335                self.headers.extend_from_slice(s_header_buffer);
336                pos += s_header_length as usize;
337            }
338
339            // Add the extended sequence to the block
340            for _ in 0..encoded_sequence_len(xlen, self.bitsize) {
341                seq.copy_from_slice(&bytes[pos..pos + 8]);
342                self.sequences.push(LittleEndian::read_u64(&seq));
343                pos += 8;
344            }
345
346            // Add the extended quality score to the block
347            if has_quality {
348                let qual_buffer = &bytes[pos..pos + xlen as usize];
349                self.qualities.extend_from_slice(qual_buffer);
350                pos += xlen as usize;
351            }
352
353            // Add the extended header to the block
354            if has_header && xlen > 0 {
355                let x_header_length = LittleEndian::read_u64(&bytes[pos..pos + 8]);
356                self.header_lengths.push(x_header_length);
357                pos += 8; // Fixed: advance by 8 bytes for u64
358
359                let x_header_buffer = &bytes[pos..pos + x_header_length as usize];
360                self.headers.extend_from_slice(x_header_buffer);
361                pos += x_header_length as usize;
362            }
363        }
364    }
365
366    fn ingest_compressed_bytes(
367        &mut self,
368        bytes: &[u8],
369        has_quality: bool,
370        has_header: bool,
371        has_flags: bool,
372    ) -> Result<()> {
373        let mut decoder = Decoder::with_buffer(bytes)?;
374
375        let mut pos = 0;
376        loop {
377            let (flag, slen, xlen) = if has_flags {
378                // Check that we have enough bytes to at least read the flag
379                // and lengths. If not, break out of the loop.
380                if pos + 24 > self.block_size {
381                    break;
382                }
383
384                // Pull the preambles out of the compressed block and advance the position
385                let mut preamble = [0u8; 24];
386                decoder.read_exact(&mut preamble)?;
387                pos += 24;
388
389                // Read the flag + lengths
390                let flag = LittleEndian::read_u64(&preamble[0..8]);
391                let slen = LittleEndian::read_u64(&preamble[8..16]);
392                let xlen = LittleEndian::read_u64(&preamble[16..24]);
393                (Some(flag), slen, xlen)
394            } else {
395                // Check that we have enough bytes to at least read the
396                // lengths. If not, break out of the loop.
397                if pos + 16 > self.block_size {
398                    break;
399                }
400
401                // Pull the preambles out of the compressed block and advance the position
402                let mut preamble = [0u8; 16];
403                decoder.read_exact(&mut preamble)?;
404                pos += 16;
405
406                // Read the flag + lengths
407                let slen = LittleEndian::read_u64(&preamble[0..8]);
408                let xlen = LittleEndian::read_u64(&preamble[8..16]);
409                (None, slen, xlen)
410            };
411
412            // No more records in the block
413            if slen == 0 {
414                // It is possible to end up here if the block is not full
415                // In this case the flag and the length are both zero
416                // and effectively blank but initialized memory.
417                break;
418            }
419
420            // Add the record to the block
421            self.flags.push(flag);
422            self.lens.push(slen);
423            self.lens.push(xlen);
424
425            // Read the primary sequence and advance the position
426            let schunk = encoded_sequence_len(slen, self.bitsize);
427            let schunk_bytes = schunk * 8;
428            self.rbuf.resize(schunk_bytes, 0);
429            decoder.read_exact(&mut self.rbuf[0..schunk_bytes])?;
430            for chunk in self.rbuf.chunks_exact(8) {
431                let seq_part = LittleEndian::read_u64(chunk);
432                self.sequences.push(seq_part);
433            }
434            self.rbuf.clear();
435            pos += schunk_bytes;
436
437            // Add the primary quality score to the block
438            if has_quality {
439                self.rbuf.resize(slen as usize, 0);
440                decoder.read_exact(&mut self.rbuf[0..slen as usize])?;
441                self.qualities.extend_from_slice(&self.rbuf);
442                self.rbuf.clear();
443                pos += slen as usize;
444            }
445
446            // Add the primary header to the block
447            if has_header {
448                self.rbuf.resize(8, 0);
449                decoder.read_exact(&mut self.rbuf[0..8])?;
450                let s_header_length = LittleEndian::read_u64(&self.rbuf);
451                self.header_lengths.push(s_header_length);
452                self.rbuf.clear();
453                pos += 8;
454
455                self.rbuf.resize(s_header_length as usize, 0);
456                decoder.read_exact(&mut self.rbuf[0..s_header_length as usize])?;
457                self.headers.extend_from_slice(&self.rbuf);
458                self.rbuf.clear();
459                pos += s_header_length as usize;
460            }
461
462            // Read the extended sequence and advance the position
463            let xchunk = encoded_sequence_len(xlen, self.bitsize);
464            let xchunk_bytes = xchunk * 8;
465            self.rbuf.resize(xchunk_bytes, 0);
466            decoder.read_exact(&mut self.rbuf[0..xchunk_bytes])?;
467            for chunk in self.rbuf.chunks_exact(8) {
468                let seq_part = LittleEndian::read_u64(chunk);
469                self.sequences.push(seq_part);
470            }
471            self.rbuf.clear();
472            pos += xchunk_bytes;
473
474            // Add the extended quality score to the block
475            if has_quality {
476                self.rbuf.resize(xlen as usize, 0);
477                decoder.read_exact(&mut self.rbuf[0..xlen as usize])?;
478                self.qualities.extend_from_slice(&self.rbuf);
479                self.rbuf.clear();
480                pos += xlen as usize;
481            }
482
483            // Add the extended header to the block
484            if has_header && xlen > 0 {
485                self.rbuf.resize(8, 0);
486                decoder.read_exact(&mut self.rbuf[0..8])?;
487                let x_header_length = LittleEndian::read_u64(&self.rbuf);
488                self.header_lengths.push(x_header_length);
489                self.rbuf.clear();
490                pos += 8;
491
492                self.rbuf.resize(x_header_length as usize, 0);
493                decoder.read_exact(&mut self.rbuf[0..x_header_length as usize])?;
494                self.headers.extend_from_slice(&self.rbuf);
495                self.rbuf.clear();
496                pos += x_header_length as usize;
497            }
498        }
499        Ok(())
500    }
501}
502
503pub struct RecordBlockIter<'a> {
504    block: &'a RecordBlock,
505    /// Record position in the block
506    rpos: usize,
507    /// Encoded sequence position in the block
508    epos: usize,
509    /// Header position in the block
510    hpos: usize,
511    /// Quality position in the block
512    qpos: usize,
513}
514impl<'a> RecordBlockIter<'a> {
515    #[must_use]
516    pub fn new(block: &'a RecordBlock) -> Self {
517        Self {
518            block,
519            rpos: 0,
520            epos: 0,
521            hpos: 0,
522            qpos: 0,
523        }
524    }
525}
526impl<'a> Iterator for RecordBlockIter<'a> {
527    type Item = RefRecord<'a>;
528
529    fn next(&mut self) -> Option<Self::Item> {
530        if self.rpos == self.block.n_records() {
531            return None;
532        }
533
534        let index = (self.block.index + self.rpos) as u64;
535        let flag = self.block.flags[self.rpos];
536        let slen = self.block.lens[2 * self.rpos];
537        let xlen = self.block.lens[(2 * self.rpos) + 1];
538        let schunk = encoded_sequence_len(slen, self.block.bitsize);
539        let xchunk = encoded_sequence_len(xlen, self.block.bitsize);
540
541        // Handle sequences
542        let s_seq = &self.block.sequences[self.epos..self.epos + schunk];
543        self.epos += schunk;
544
545        let x_seq = &self.block.sequences[self.epos..self.epos + xchunk];
546        self.epos += xchunk;
547
548        // Handle quality scores (separate position tracking)
549        let s_qual = if self.block.qualities.is_empty() {
550            &[]
551        } else {
552            let qual = &self.block.qualities[self.qpos..self.qpos + slen as usize];
553            self.qpos += slen as usize;
554            qual
555        };
556
557        let x_qual = if self.block.qualities.is_empty() {
558            &[]
559        } else {
560            let qual = &self.block.qualities[self.qpos..self.qpos + xlen as usize];
561            self.qpos += xlen as usize;
562            qual
563        };
564
565        // Handle headers (separate position tracking)
566        let header_idx = if xlen > 0 { 2 * self.rpos } else { self.rpos };
567        let mut shlen = 0;
568        let s_header = if self.block.headers.is_empty() {
569            &[]
570        } else {
571            // Get header length
572            shlen = self.block.header_lengths[header_idx];
573
574            // Extract header data
575            let header = &self.block.headers[self.hpos..self.hpos + shlen as usize];
576            self.hpos += shlen as usize;
577            header
578        };
579
580        let mut xhlen = 0;
581        let x_header = if self.block.headers.is_empty() || xlen == 0 {
582            &[]
583        } else {
584            xhlen = self.block.header_lengths[header_idx + 1];
585            let header = &self.block.headers[self.hpos..self.hpos + xhlen as usize];
586            self.hpos += xhlen as usize;
587            header
588        };
589
590        // update record position
591        self.rpos += 1;
592
593        Some(RefRecord::new(
594            self.block.bitsize,
595            index,
596            flag,
597            slen,
598            xlen,
599            s_seq,
600            x_seq,
601            s_qual,
602            x_qual,
603            shlen,
604            s_header,
605            xhlen,
606            x_header,
607        ))
608    }
609}
610
611/// A reference to a record in a VBINSEQ file
612///
613/// `RefRecord` provides a lightweight view into a record within a `RecordBlock`.
614/// It holds references to the underlying data rather than owning it, making it
615/// efficient to iterate through records without copying data.
616///
617/// Each record contains a primary sequence (accessible via `sbuf` and related methods)
618/// and optionally a paired/extended sequence (accessible via `xbuf` and related methods).
619/// Both sequences may also have associated quality scores.
620///
621/// # Examples
622///
623/// ```rust,no_run
624/// use binseq::vbq::MmapReader;
625/// use binseq::BinseqRecord;
626///
627/// let mut reader = MmapReader::new("example.vbq").unwrap();
628/// let mut block = reader.new_block();
629/// reader.read_block_into(&mut block).unwrap();
630///
631/// let mut sequence = Vec::new();
632///
633/// for record in block.iter() {
634///     // Get record metadata
635///     println!("Record {}, flag: {:?}", record.index(), record.flag());
636///
637///     // Decode the primary sequence
638///     record.decode_s(&mut sequence).unwrap();
639///     println!("Sequence: {}", std::str::from_utf8(&sequence).unwrap());
640///     sequence.clear();
641///
642///     // If this is a paired record, decode the paired sequence
643///     if record.is_paired() {
644///         record.decode_x(&mut sequence).unwrap();
645///         println!("Paired sequence: {}", std::str::from_utf8(&sequence).unwrap());
646///         sequence.clear();
647///     }
648///
649///     // Access quality scores if available
650///     if record.has_quality() {
651///         println!("Quality scores available");
652///     }
653/// }
654/// ```
655pub struct RefRecord<'a> {
656    /// Bitsize of the record
657    bitsize: BitSize,
658
659    /// Global index of this record within the file
660    index: u64,
661
662    /// Flag value for this record (can be used for custom metadata)
663    flag: Option<u64>,
664
665    /// Length of the primary sequence in nucleotides
666    slen: u64,
667
668    /// Length of the extended/paired sequence in nucleotides (0 if not paired)
669    xlen: u64,
670
671    /// Buffer containing the encoded primary nucleotide sequence
672    sbuf: &'a [u64],
673
674    /// Buffer containing the encoded extended/paired nucleotide sequence
675    xbuf: &'a [u64],
676
677    /// Quality scores for the primary sequence (empty if quality scores not present)
678    squal: &'a [u8],
679
680    /// Quality scores for the extended/paired sequence (empty if not paired or no quality)
681    xqual: &'a [u8],
682
683    /// Length of the header for the primary sequence in bytes
684    shlen: u64,
685
686    /// Header for the record
687    sheader: &'a [u8],
688
689    /// Length of the header for the extended/paired sequence in bytes
690    xhlen: u64,
691
692    /// Header for the extended/paired sequence (empty if not paired)
693    xheader: &'a [u8],
694}
695impl<'a> RefRecord<'a> {
696    #[allow(clippy::too_many_arguments)]
697    #[must_use]
698    pub fn new(
699        bitsize: BitSize,
700        index: u64,
701        flag: Option<u64>,
702        slen: u64,
703        xlen: u64,
704        sbuf: &'a [u64],
705        xbuf: &'a [u64],
706        squal: &'a [u8],
707        xqual: &'a [u8],
708        shlen: u64,
709        sheader: &'a [u8],
710        xhlen: u64,
711        xheader: &'a [u8],
712    ) -> Self {
713        Self {
714            bitsize,
715            index,
716            flag,
717            slen,
718            xlen,
719            sbuf,
720            xbuf,
721            squal,
722            xqual,
723            shlen,
724            sheader,
725            xhlen,
726            xheader,
727        }
728    }
729}
730
731impl RefRecord<'_> {
732    pub fn shlen(&self) -> u64 {
733        self.shlen
734    }
735    pub fn xhlen(&self) -> u64 {
736        self.xhlen
737    }
738}
739
740impl BinseqRecord for RefRecord<'_> {
741    fn bitsize(&self) -> BitSize {
742        self.bitsize
743    }
744    fn index(&self) -> u64 {
745        self.index
746    }
747    fn sheader(&self, buffer: &mut Vec<u8>) {
748        buffer.clear();
749        if self.sheader.is_empty() {
750            buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes());
751        } else {
752            buffer.extend_from_slice(self.sheader);
753        }
754    }
755    fn xheader(&self, buffer: &mut Vec<u8>) {
756        buffer.clear();
757        if self.sheader.is_empty() {
758            buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes());
759        } else {
760            buffer.extend_from_slice(self.xheader);
761        }
762    }
763    fn flag(&self) -> Option<u64> {
764        self.flag
765    }
766    fn slen(&self) -> u64 {
767        self.slen
768    }
769    fn xlen(&self) -> u64 {
770        self.xlen
771    }
772    fn sbuf(&self) -> &[u64] {
773        self.sbuf
774    }
775    fn xbuf(&self) -> &[u64] {
776        self.xbuf
777    }
778    fn squal(&self) -> &[u8] {
779        self.squal
780    }
781    fn xqual(&self) -> &[u8] {
782        self.xqual
783    }
784}
785
786/// Memory-mapped reader for VBINSEQ files
787///
788/// [`MmapReader`] provides efficient, memory-mapped access to VBINSEQ files. It allows
789/// sequential reading of record blocks and supports parallel processing of records.
790///
791/// ## Format Support (v0.7.0+)
792///
793/// - **Embedded Index**: Automatically loads index from within VBQ files
794/// - **Headers Support**: Reads optional sequence headers/identifiers from records
795/// - **Multi-bit Encoding**: Supports both 2-bit and 4-bit nucleotide encodings
796/// - **Extended Capacity**: u64 indexing supports files with more than 4 billion records
797///
798/// Memory mapping allows the operating system to lazily load file contents as needed,
799/// which can be more efficient than standard file I/O, especially for large files.
800///
801/// The [`MmapReader`] is designed to be used in a multi-threaded environment, and it
802/// is built around [`RecordBlock`]s which are the units of data in a VBINSEQ file.
803/// Each one would be held by a separate thread and would load data from the shared
804/// [`MmapReader`] through the [`MmapReader::read_block_into`] method. However, they can
805/// also be used in a single-threaded environment for sequential processing.
806///
807/// Each [`RecordBlock`] contains a [`BlockHeader`] and is used to access [`RefRecord`]s
808/// which implement the [`BinseqRecord`] trait.
809///
810/// ## Index Loading
811///
812/// The reader automatically loads the embedded index by:
813/// 1. Reading the index trailer from the end of the file
814/// 2. Validating the INDEX_END_MAGIC marker
815/// 3. Decompressing the embedded index data
816/// 4. Using the index for efficient random access and parallel processing
817///
818/// # Examples
819///
820/// ```
821/// use binseq::vbq::MmapReader;
822/// use binseq::{BinseqRecord, Result};
823///
824/// fn main() -> Result<()> {
825///     let path = "./data/subset.vbq";
826///     let mut reader = MmapReader::new(path)?; // Index loaded automatically
827///
828///     // Create buffers for sequence data and headers
829///     let mut seq_buffer = Vec::new();
830///     let mut header_buffer = Vec::new();
831///     let mut block = reader.new_block();
832///
833///     // Read blocks sequentially
834///     while reader.read_block_into(&mut block)? {
835///         println!("Read a block with {} records", block.n_records());
836///         for record in block.iter() {
837///             // Decode sequence and header
838///             record.decode_s(&mut seq_buffer)?;
839///             record.sheader(&mut header_buffer);
840///
841///             println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap_or(""));
842///             println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap_or(""));
843///
844///             seq_buffer.clear();
845///             header_buffer.clear();
846///         }
847///     }
848///     Ok(())
849/// }
850/// ```
851pub struct MmapReader {
852    /// Path to the VBINSEQ file
853    path: PathBuf,
854
855    /// Memory-mapped file contents for efficient access
856    mmap: Arc<Mmap>,
857
858    /// Parsed header information from the file
859    header: VBinseqHeader,
860
861    /// Current cursor position in the file (in bytes)
862    pos: usize,
863
864    /// Total number of records read from the file so far
865    total: usize,
866}
867impl MmapReader {
868    /// Creates a new `MmapReader` for a VBINSEQ file
869    ///
870    /// This method opens the specified file, memory-maps its contents, reads the
871    /// VBINSEQ header information, and loads the embedded index. The reader is positioned
872    /// at the beginning of the first record block after the header.
873    ///
874    /// ## Index Loading (v0.7.0+)
875    ///
876    /// The embedded index is automatically loaded from the end of the file. For legacy
877    /// files with separate `.vqi` index files, the index is automatically migrated to
878    /// the embedded format.
879    ///
880    /// # Parameters
881    ///
882    /// * `path` - Path to the VBINSEQ file to open
883    ///
884    /// # Returns
885    ///
886    /// A new `MmapReader` instance if successful
887    ///
888    /// # Errors
889    ///
890    /// * `ReadError::InvalidFileType` if the path doesn't point to a regular file
891    /// * I/O errors if the file can't be opened or memory-mapped
892    /// * Header validation errors if the file doesn't contain a valid VBINSEQ header
893    ///
894    /// # Examples
895    ///
896    /// ```rust,no_run
897    /// use binseq::vbq::MmapReader;
898    ///
899    /// let reader = MmapReader::new("path/to/file.vbq").unwrap();
900    /// ```
901    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
902        // Verify it's a regular file before attempting to map
903        let file = File::open(&path)?;
904        if !file.metadata()?.is_file() {
905            return Err(ReadError::InvalidFileType.into());
906        }
907
908        // Safety: The file is open and won't be modified while mapped
909        let mmap = unsafe { Mmap::map(&file)? };
910
911        // Read header from mapped memory
912        let header = {
913            let mut header_bytes = [0u8; SIZE_HEADER];
914            header_bytes.copy_from_slice(&mmap[..SIZE_HEADER]);
915            VBinseqHeader::from_bytes(&header_bytes)?
916        };
917
918        Ok(Self {
919            path: PathBuf::from(path.as_ref()),
920            mmap: Arc::new(mmap),
921            header,
922            pos: SIZE_HEADER,
923            total: 0,
924        })
925    }
926
927    /// Creates a new empty record block with the appropriate size for this file
928    ///
929    /// This creates a `RecordBlock` with a block size matching the one specified in the
930    /// file's header, ensuring it will be able to hold a full block of records.
931    ///
932    /// # Returns
933    ///
934    /// A new empty `RecordBlock` instance sized appropriately for this file
935    ///
936    /// # Examples
937    ///
938    /// ```rust,no_run
939    /// use binseq::vbq::MmapReader;
940    ///
941    /// let reader = MmapReader::new("example.vbq").unwrap();
942    /// let mut block = reader.new_block();
943    /// ```
944    #[must_use]
945    pub fn new_block(&self) -> RecordBlock {
946        RecordBlock::new(self.header.bits, self.header.block as usize)
947    }
948
949    /// Returns the path where the index file would be located
950    ///
951    /// The index file is used for random access to blocks and has the same path as
952    /// the VBINSEQ file with the ".vqi" extension appended.
953    ///
954    /// # Returns
955    ///
956    /// The path where the index file would be located
957    ///
958    /// # Examples
959    ///
960    /// ```
961    /// use binseq::vbq::MmapReader;
962    /// use binseq::Result;
963    ///
964    /// fn main() -> Result<()> {
965    ///     let path = "./data/subset.vbq";
966    ///     let reader = MmapReader::new(path)?;
967    ///     let index_path = reader.index_path();
968    ///     assert_eq!(index_path.to_str(), Some("./data/subset.vbq.vqi"));
969    ///     Ok(())
970    /// }
971    /// ```
972    #[must_use]
973    pub fn index_path(&self) -> PathBuf {
974        let mut p = self.path.as_os_str().to_owned();
975        p.push(".vqi");
976        p.into()
977    }
978
979    /// Returns a copy of the file's header information
980    ///
981    /// The header contains information about the file format, including whether
982    /// quality scores are included, whether blocks are compressed, and whether
983    /// records are paired.
984    ///
985    /// # Returns
986    ///
987    /// A copy of the file's `VBinseqHeader`
988    #[must_use]
989    pub fn header(&self) -> VBinseqHeader {
990        self.header
991    }
992
993    /// Checks if the file contains paired records
994    #[must_use]
995    pub fn is_paired(&self) -> bool {
996        self.header.is_paired()
997    }
998
999    /// Fills an existing `RecordBlock` with the next block of records from the file
1000    ///
1001    /// This method reads the next block of records from the current position in the file
1002    /// and populates the provided `RecordBlock` with the data. The block is cleared and reused
1003    /// to avoid unnecessary memory allocations. This is the primary method for sequential
1004    /// reading of VBINSEQ files.
1005    ///
1006    /// The method automatically handles decompression if the file was written with
1007    /// compression enabled and updates the total record count as it progresses through the file.
1008    ///
1009    /// # Parameters
1010    ///
1011    /// * `block` - A mutable reference to a `RecordBlock` to be filled with data
1012    ///
1013    /// # Returns
1014    ///
1015    /// * `Ok(true)` - If a block was successfully read
1016    /// * `Ok(false)` - If the end of the file was reached (no more blocks)
1017    /// * `Err(_)` - If an error occurred during reading
1018    ///
1019    /// # Examples
1020    ///
1021    /// ```rust,no_run
1022    /// use binseq::vbq::MmapReader;
1023    /// use binseq::BinseqRecord;
1024    /// use std::io::Write;
1025    ///
1026    /// let mut reader = MmapReader::new("example.vbq").unwrap();
1027    /// let mut block = reader.new_block();
1028    /// let mut sequence_buffer = Vec::new();
1029    ///
1030    /// // Read blocks until the end of file
1031    /// while reader.read_block_into(&mut block).unwrap() {
1032    ///     println!("Read block with {} records", block.n_records());
1033    ///
1034    ///     // Process each record
1035    ///     for record in block.iter() {
1036    ///         // Decode the nucleotide sequence
1037    ///         record.decode_s(&mut sequence_buffer).unwrap();
1038    ///
1039    ///         // Do something with the sequence
1040    ///         println!("Record {}: length {}", record.index(), sequence_buffer.len());
1041    ///         sequence_buffer.clear();
1042    ///     }
1043    /// }
1044    /// ```
1045    pub fn read_block_into(&mut self, block: &mut RecordBlock) -> Result<bool> {
1046        // Clear the block
1047        block.clear();
1048
1049        // Validate the next block header is within bounds and present
1050        if self.pos + SIZE_BLOCK_HEADER > self.mmap.len() {
1051            return Ok(false);
1052        }
1053        let mut header_bytes = [0u8; SIZE_BLOCK_HEADER];
1054        header_bytes.copy_from_slice(&self.mmap[self.pos..self.pos + SIZE_BLOCK_HEADER]);
1055        let header = match BlockHeader::from_bytes(&header_bytes) {
1056            Ok(header) => {
1057                self.pos += SIZE_BLOCK_HEADER;
1058                header
1059            }
1060            // Bytes left - but not a BlockHeader - could be the index
1061            Err(e) => {
1062                let mut index_header_bytes = [0u8; INDEX_HEADER_SIZE];
1063                index_header_bytes
1064                    .copy_from_slice(&self.mmap[self.pos..self.pos + INDEX_HEADER_SIZE]);
1065                if IndexHeader::from_bytes(&index_header_bytes).is_ok() {
1066                    // Expected end of file
1067                    return Ok(false);
1068                }
1069                return Err(e.into());
1070            }
1071        };
1072
1073        // Read the block contents
1074        let rbound = if self.header.compressed {
1075            header.size as usize
1076        } else {
1077            self.header.block as usize
1078        };
1079        if self.pos + rbound > self.mmap.len() {
1080            return Err(ReadError::UnexpectedEndOfFile(self.pos).into());
1081        }
1082        let block_buffer = &self.mmap[self.pos..self.pos + rbound];
1083        if self.header.compressed {
1084            block.ingest_compressed_bytes(
1085                block_buffer,
1086                self.header.qual,
1087                self.header.headers,
1088                self.header.flags,
1089            )?;
1090        } else {
1091            block.ingest_bytes(
1092                block_buffer,
1093                self.header.qual,
1094                self.header.headers,
1095                self.header.flags,
1096            );
1097        }
1098
1099        // Update the block index
1100        block.update_index(self.total);
1101
1102        self.pos += rbound;
1103        self.total += header.records as usize;
1104
1105        Ok(true)
1106    }
1107
1108    /// Loads or creates the block index for this VBINSEQ file
1109    ///
1110    /// The block index provides metadata about each block in the file, enabling
1111    /// random access to blocks and parallel processing. This method first attempts to
1112    /// load an existing index file. If the index doesn't exist or doesn't match the
1113    /// current file, it automatically generates a new index from the VBINSEQ file
1114    /// and saves it for future use.
1115    ///
1116    /// # Returns
1117    ///
1118    /// The loaded or newly created `BlockIndex` if successful
1119    ///
1120    /// # Errors
1121    ///
1122    /// * File I/O errors when reading or creating the index
1123    /// * Parsing errors if the VBINSEQ file has invalid format
1124    /// * Other index-related errors that cannot be resolved by creating a new index
1125    ///
1126    /// # Examples
1127    ///
1128    /// ```rust,no_run
1129    /// use binseq::vbq::MmapReader;
1130    ///
1131    /// let reader = MmapReader::new("example.vbq").unwrap();
1132    ///
1133    /// // Load the index file (or create if it doesn't exist)
1134    /// let index = reader.load_index().unwrap();
1135    ///
1136    /// // Use the index to get information about the file
1137    /// println!("Number of blocks: {}", index.n_blocks());
1138    /// ```
1139    ///
1140    /// # Notes
1141    ///
1142    /// The index file is stored with the same path as the VBINSEQ file but with a ".vqi"
1143    /// extension appended. This allows for reusing the index across multiple runs,
1144    /// which can significantly improve startup performance for large files.
1145    pub fn load_index(&self) -> Result<BlockIndex> {
1146        let start_pos_magic = self.mmap.len() - 8;
1147        let start_pos_index_size = start_pos_magic - 8;
1148
1149        // Validate the magic number
1150        let magic = LittleEndian::read_u64(&self.mmap[start_pos_magic..]);
1151        if magic != INDEX_END_MAGIC {
1152            return Err(ReadError::MissingIndexEndMagic.into());
1153        }
1154
1155        // Get the index size
1156        let index_size = LittleEndian::read_u64(&self.mmap[start_pos_index_size..start_pos_magic]);
1157
1158        // Determine the start position of the index bytes
1159        let start_pos_index = start_pos_index_size - index_size as usize;
1160
1161        // Slice into the index bytes
1162        let index_bytes = &self.mmap[start_pos_index..start_pos_index_size];
1163
1164        // Build the index from the bytes
1165        BlockIndex::from_bytes(index_bytes)
1166    }
1167
1168    pub fn num_records(&self) -> Result<usize> {
1169        let index = self.load_index()?;
1170        Ok(index.num_records())
1171    }
1172}
1173
1174impl ParallelReader for MmapReader {
1175    /// Processes all records in the file in parallel using multiple threads
1176    ///
1177    /// This method provides efficient parallel processing of VBINSEQ files by distributing
1178    /// blocks across multiple worker threads. The file's block structure is leveraged to divide
1179    /// the work evenly without requiring thread synchronization during processing, which leads
1180    /// to near-linear scaling with the number of threads.
1181    ///
1182    /// The method automatically loads or creates an index file to identify block boundaries,
1183    /// then distributes the blocks among the requested number of threads. Each thread processes
1184    /// its assigned blocks sequentially, but multiple blocks are processed in parallel across
1185    /// threads.
1186    ///
1187    /// # Type Parameters
1188    ///
1189    /// * `P` - A type that implements the `ParallelProcessor` trait, which defines how records are processed
1190    ///
1191    /// # Parameters
1192    ///
1193    /// * `self` - Consumes the reader, as it will be used across multiple threads
1194    /// * `processor` - An instance of a type implementing `ParallelProcessor` that will be cloned for each thread
1195    /// * `num_threads` - Number of worker threads to use for processing
1196    ///
1197    /// # Returns
1198    ///
1199    /// * `Ok(())` - If all records were successfully processed
1200    /// * `Err(_)` - If an error occurs during processing
1201    ///
1202    /// # Examples
1203    ///
1204    /// ```rust,no_run
1205    /// use binseq::vbq::{MmapReader, RefRecord};
1206    /// use binseq::{ParallelProcessor, ParallelReader, BinseqRecord, Result};
1207    /// use std::sync::atomic::{AtomicUsize, Ordering};
1208    /// use std::sync::Arc;
1209    ///
1210    /// // Create a simple processor that counts records
1211    /// struct RecordCounter {
1212    ///     count: Arc<AtomicUsize>,
1213    ///     thread_id: usize,
1214    /// }
1215    ///
1216    /// impl RecordCounter {
1217    ///     fn new() -> Self {
1218    ///         Self {
1219    ///             count: Arc::new(AtomicUsize::new(0)),
1220    ///             thread_id: 0,
1221    ///         }
1222    ///     }
1223    ///
1224    ///     fn total_count(&self) -> usize {
1225    ///         self.count.load(Ordering::Relaxed)
1226    ///     }
1227    /// }
1228    ///
1229    /// impl Clone for RecordCounter {
1230    ///     fn clone(&self) -> Self {
1231    ///         Self {
1232    ///             count: Arc::clone(&self.count),
1233    ///             thread_id: 0,
1234    ///         }
1235    ///     }
1236    /// }
1237    ///
1238    /// impl ParallelProcessor for RecordCounter {
1239    ///     fn process_record<R: BinseqRecord>(&mut self, _record: R) -> Result<()> {
1240    ///         self.count.fetch_add(1, Ordering::Relaxed);
1241    ///         Ok(())
1242    ///     }
1243    ///
1244    ///     fn on_batch_complete(&mut self) -> Result<()> {
1245    ///         // Optional: perform actions after each block is processed
1246    ///         Ok(())
1247    ///     }
1248    ///
1249    ///     fn set_tid(&mut self, tid: usize) {
1250    ///         self.thread_id = tid;
1251    ///     }
1252    /// }
1253    ///
1254    /// // Use the processor with a VBINSEQ file
1255    /// let reader = MmapReader::new("example.vbq").unwrap();
1256    /// let counter = RecordCounter::new();
1257    ///
1258    /// // Process the file with 4 threads
1259    /// reader.process_parallel(counter.clone(), 4).unwrap();
1260    ///
1261    /// // Get the total number of records processed
1262    /// println!("Total records: {}", counter.total_count());
1263    /// ```
1264    ///
1265    /// # Notes
1266    ///
1267    /// * The `ParallelProcessor` instance is cloned for each worker thread, so any shared state
1268    ///   should be wrapped in thread-safe containers like `Arc`.
1269    /// * The `set_tid` method is called with a unique thread ID before processing begins, which
1270    ///   can be used to distinguish between worker threads.
1271    /// * This method consumes the reader (takes ownership), as it's distributed across threads.
1272    fn process_parallel<P: ParallelProcessor + Clone + 'static>(
1273        self,
1274        processor: P,
1275        num_threads: usize,
1276    ) -> Result<()> {
1277        let num_records = self.num_records()?;
1278        self.process_parallel_range(processor, num_threads, 0..num_records)
1279    }
1280
1281    /// Process records in parallel within a specified range
1282    ///
1283    /// This method allows parallel processing of a subset of records within the file,
1284    /// defined by a start and end index. The method maps the record range to the
1285    /// appropriate blocks and processes only the records within the specified range.
1286    ///
1287    /// # Arguments
1288    ///
1289    /// * `processor` - The processor to use for each record
1290    /// * `num_threads` - The number of threads to spawn
1291    /// * `start` - The starting record index (inclusive)
1292    /// * `end` - The ending record index (exclusive)
1293    ///
1294    /// # Type Parameters
1295    ///
1296    /// * `P` - A type that implements `ParallelProcessor` and can be cloned
1297    ///
1298    /// # Returns
1299    ///
1300    /// * `Ok(())` - If all records were processed successfully
1301    /// * `Err(Error)` - If an error occurred during processing
1302    fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
1303        self,
1304        processor: P,
1305        num_threads: usize,
1306        range: Range<usize>,
1307    ) -> Result<()> {
1308        // Calculate the number of threads to use
1309        let num_threads = if num_threads == 0 {
1310            num_cpus::get()
1311        } else {
1312            num_threads.min(num_cpus::get())
1313        };
1314
1315        // Generate or load the index first
1316        let index = self.load_index()?;
1317
1318        // Validate range
1319        let total_records = index.num_records();
1320        if range.start >= total_records || range.end > total_records || range.start >= range.end {
1321            return Ok(()); // Nothing to process or invalid range
1322        }
1323
1324        // Find blocks that contain records in the specified range
1325        let relevant_blocks = index
1326            .ranges()
1327            .iter()
1328            .filter(|r| {
1329                let iv_start = r.cumulative_records as usize;
1330                let iv_end = (r.cumulative_records + r.block_records as u64) as usize;
1331                iv_start < range.end && iv_end > range.start
1332            })
1333            .copied()
1334            .collect::<Vec<_>>();
1335
1336        if relevant_blocks.is_empty() {
1337            return Ok(()); // No relevant blocks
1338        }
1339
1340        // Calculate block assignments for threads
1341        let blocks_per_thread = relevant_blocks.len().div_ceil(num_threads);
1342
1343        // Create shared resources
1344        let mmap = Arc::clone(&self.mmap);
1345        let header = self.header;
1346
1347        // Spawn worker threads
1348        let mut handles = Vec::new();
1349
1350        for thread_id in 0..num_threads {
1351            // Calculate this thread's block range
1352            let start_block_idx = thread_id * blocks_per_thread;
1353            let end_block_idx =
1354                std::cmp::min((thread_id + 1) * blocks_per_thread, relevant_blocks.len());
1355
1356            if start_block_idx >= relevant_blocks.len() {
1357                continue;
1358            }
1359
1360            let mmap = Arc::clone(&mmap);
1361            let mut proc = processor.clone();
1362            proc.set_tid(thread_id);
1363
1364            // Get block ranges for this thread
1365            let thread_blocks: Vec<BlockRange> =
1366                relevant_blocks[start_block_idx..end_block_idx].to_vec();
1367
1368            let handle = std::thread::spawn(move || -> Result<()> {
1369                // Create block to reuse for processing (within thread)
1370                let mut record_block = RecordBlock::new(header.bits, header.block as usize);
1371
1372                // Process each assigned block
1373                for block_range in thread_blocks {
1374                    // Clear the block for reuse
1375                    record_block.clear();
1376
1377                    // Skip the block header to get to data
1378                    let block_start = block_range.start_offset as usize + SIZE_BLOCK_HEADER;
1379                    let block_data = &mmap[block_start..block_start + block_range.len as usize];
1380
1381                    // Ingest data according to the compression setting
1382                    if header.compressed {
1383                        record_block.ingest_compressed_bytes(
1384                            block_data,
1385                            header.qual,
1386                            header.headers,
1387                            header.flags,
1388                        )?;
1389                    } else {
1390                        record_block.ingest_bytes(
1391                            block_data,
1392                            header.qual,
1393                            header.headers,
1394                            header.flags,
1395                        );
1396                    }
1397
1398                    // Update the record block index
1399                    record_block.update_index(block_range.cumulative_records as usize);
1400
1401                    // Process records in this block that fall within our range
1402                    for record in record_block.iter() {
1403                        let global_record_idx = record.index as usize;
1404
1405                        // Only process records within our specified range
1406                        if global_record_idx >= range.start && global_record_idx < range.end {
1407                            proc.process_record(record)?;
1408                        }
1409                    }
1410
1411                    // Signal batch completion
1412                    proc.on_batch_complete()?;
1413                }
1414
1415                Ok(())
1416            });
1417
1418            handles.push(handle);
1419        }
1420
1421        // Wait for all threads to complete
1422        for handle in handles {
1423            handle.join().unwrap()?;
1424        }
1425
1426        Ok(())
1427    }
1428}