binseq/vbq/reader.rs
1//! Reader implementation for VBINSEQ files
2//!
3//! This module provides functionality for reading sequence data from VBINSEQ files,
4//! including support for compressed blocks, quality scores, paired-end reads, and sequence headers.
5//!
6//! ## Format Changes (v0.7.0+)
7//!
8//! - **Embedded Index**: Readers now load the index from within VBQ files instead of separate `.vqi` files
9//! - **Headers Support**: Optional sequence headers/identifiers can be read from each record
10//! - **Multi-bit Encoding**: Support for reading 2-bit and 4-bit nucleotide encodings
11//! - **Extended Capacity**: u64 indexing supports files with more than 4 billion records
12//!
13//! ## Index Loading
14//!
15//! The reader automatically loads the embedded index from the end of VBQ files:
16//! 1. Seeks to the end of the file to read the index trailer
17//! 2. Validates the INDEX_END_MAGIC marker
18//! 3. Reads the index size and decompresses the embedded index
19//! 4. Uses the index for efficient random access and parallel processing
20//!
21//! ## Memory-Mapped Reading
22//!
23//! The `MmapReader` provides efficient access to large files through memory mapping:
24//! - Zero-copy access to file data
25//! - Efficient random access using the embedded index
26//! - Support for parallel processing across record ranges
27//!
28//! ## Example
29//!
30//! ```rust,no_run
31//! use binseq::vbq::MmapReader;
32//! use binseq::BinseqRecord;
33//!
34//! // Open a VBQ file (index is automatically loaded)
35//! let mut reader = MmapReader::new("example.vbq").unwrap();
36//! let mut block = reader.new_block();
37//!
38//! // Read records with headers and quality scores
39//! let mut seq_buffer = Vec::new();
40//! let mut header_buffer = Vec::new();
41//! while reader.read_block_into(&mut block).unwrap() {
42//! for record in block.iter() {
43//! record.decode_s(&mut seq_buffer).unwrap();
44//! record.sheader(&mut header_buffer);
45//! println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap());
46//! println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap());
47//! if !record.squal().is_empty() {
48//! println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap());
49//! }
50//! seq_buffer.clear();
51//! header_buffer.clear();
52//! }
53//! }
54//! ```
55
56use std::fs::File;
57use std::io::Read;
58use std::ops::Range;
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61
62use bitnuc::BitSize;
63use byteorder::{ByteOrder, LittleEndian};
64use memmap2::Mmap;
65use zstd::Decoder;
66
67use super::{
68 header::{SIZE_BLOCK_HEADER, SIZE_HEADER},
69 BlockHeader, BlockIndex, BlockRange, VBinseqHeader,
70};
71use crate::vbq::index::{IndexHeader, INDEX_END_MAGIC, INDEX_HEADER_SIZE};
72use crate::{
73 error::{ReadError, Result},
74 BinseqRecord, ParallelProcessor, ParallelReader,
75};
76
77/// Calculates the number of 64-bit words needed to store a nucleotide sequence of the given length
78///
79/// Nucleotides are packed into 64-bit words with 2 bits per nucleotide (32 nucleotides per word).
80/// This function calculates how many 64-bit words are needed to encode a sequence of a given length.
81///
82/// # Parameters
83///
84/// * `len` - Length of the nucleotide sequence in basepairs
85///
86/// # Returns
87///
88/// The number of 64-bit words required to encode the sequence
89fn encoded_sequence_len(len: u64, bitsize: BitSize) -> usize {
90 match bitsize {
91 BitSize::Two => len.div_ceil(32) as usize,
92 BitSize::Four => len.div_ceil(16) as usize,
93 }
94}
95
96/// A container for a block of VBINSEQ records
97///
98/// The `RecordBlock` struct represents a single block of records read from a VBINSEQ file.
99/// It stores the raw data for multiple records in vectors, allowing efficient iteration
100/// over the records without copying memory for each record.
101///
102/// ## Format Support (v0.7.0+)
103///
104/// - Supports reading records with optional sequence headers
105/// - Handles both 2-bit and 4-bit nucleotide encodings
106/// - Supports quality scores and paired sequences
107/// - Compatible with both compressed and uncompressed blocks
108///
109/// The `RecordBlock` is reused when reading blocks sequentially from a file, with its
110/// contents being cleared and replaced with each new block that is read.
111///
112/// # Examples
113///
114/// ```rust,no_run
115/// use binseq::vbq::MmapReader;
116///
117/// let reader = MmapReader::new("example.vbq").unwrap();
118/// let mut block = reader.new_block(); // Create a block with appropriate size
119/// ```
120pub struct RecordBlock {
121 /// Bitsize of the records in the block
122 bitsize: BitSize,
123
124 /// Index of the first record in the block
125 /// This allows records to maintain their global position in the file
126 index: usize,
127
128 /// Buffer containing all record flags in the block
129 /// Each record has one flag value stored at the corresponding position
130 flags: Vec<Option<u64>>,
131
132 /// Buffer containing all sequence lengths in the block
133 /// For each record, two consecutive entries are stored: primary sequence length and extended sequence length
134 lens: Vec<u64>,
135
136 /// Buffer containing all header lengths in the block
137 /// For each record, two consecutive entries are stored: primary header length and extended header length
138 header_lengths: Vec<u64>,
139
140 /// Buffer containing all packed nucleotide sequences in the block
141 /// Nucleotides are encoded as 2-bit values (4 nucleotides per byte)
142 sequences: Vec<u64>,
143
144 /// Buffer containing all quality scores in the block
145 /// Quality scores are stored as raw bytes, one byte per nucleotide
146 qualities: Vec<u8>,
147
148 /// Buffer containing all headers in the block
149 headers: Vec<u8>,
150
151 /// Maximum size of the block in bytes
152 /// This is derived from the file header's block size field
153 block_size: usize,
154
155 /// Reusable buffer for temporary storage during decompression
156 /// Using a reusable buffer reduces memory allocations
157 rbuf: Vec<u8>,
158}
159impl RecordBlock {
160 /// Creates a new empty `RecordBlock` with the specified block size
161 ///
162 /// The block size should match the one specified in the VBINSEQ file header
163 /// for proper operation. This is typically handled automatically when using
164 /// `MmapReader::new_block()`.
165 ///
166 /// # Parameters
167 ///
168 /// * `bitsize` - Bitsize of the records in the block
169 /// * `block_size` - Maximum size of the block in bytes
170 ///
171 /// # Returns
172 ///
173 /// A new empty `RecordBlock` instance
174 #[must_use]
175 pub fn new(bitsize: BitSize, block_size: usize) -> Self {
176 Self {
177 bitsize,
178 index: 0,
179 flags: Vec::new(),
180 lens: Vec::new(),
181 header_lengths: Vec::new(),
182 sequences: Vec::new(),
183 qualities: Vec::new(),
184 headers: Vec::new(),
185 block_size,
186 rbuf: Vec::new(),
187 }
188 }
189
190 /// Returns the number of records in this block
191 ///
192 /// # Returns
193 ///
194 /// The number of records currently stored in this block
195 #[must_use]
196 pub fn n_records(&self) -> usize {
197 self.flags.len()
198 }
199
200 /// Returns an iterator over the records in this block
201 ///
202 /// The iterator yields `RefRecord` instances that provide access to the record data
203 /// without copying the underlying data.
204 ///
205 /// # Returns
206 ///
207 /// An iterator over the records in this block
208 ///
209 /// # Examples
210 ///
211 /// ```rust,no_run
212 /// use binseq::vbq::MmapReader;
213 /// use binseq::BinseqRecord;
214 ///
215 /// let mut reader = MmapReader::new("example.vbq").unwrap();
216 /// let mut block = reader.new_block();
217 /// reader.read_block_into(&mut block).unwrap();
218 ///
219 /// // Iterate over records in the block
220 /// for record in block.iter() {
221 /// println!("Record {}", record.index());
222 /// }
223 /// ```
224 #[must_use]
225 #[allow(clippy::iter_without_into_iter)]
226 pub fn iter(&self) -> RecordBlockIter<'_> {
227 RecordBlockIter::new(self)
228 }
229
230 /// Updates the starting index of the block
231 ///
232 /// This is used internally to keep track of the global position of records
233 /// within the file, allowing each record to maintain its original index.
234 ///
235 /// # Parameters
236 ///
237 /// * `index` - The index of the first record in the block
238 fn update_index(&mut self, index: usize) {
239 self.index = index;
240 }
241
242 /// Clears all data from the block
243 ///
244 /// This method resets the block to an empty state, clearing all vectors and resetting
245 /// the index to 0. This is typically used when reusing a block for reading a new block
246 /// from a file.
247 pub fn clear(&mut self) {
248 self.index = 0;
249 self.flags.clear();
250 self.lens.clear();
251 self.header_lengths.clear();
252 self.sequences.clear();
253 self.qualities.clear();
254 self.headers.clear();
255 }
256
257 /// Ingest the bytes from a block into the record block
258 ///
259 /// This method takes a slice of bytes and processes it to extract
260 /// the records from the block. It is used when reading a block from
261 /// a file into a record block.
262 ///
263 /// This is a private method used primarily for parallel processing.
264 ///
265 /// # Parameters
266 ///
267 /// * `bytes` - A slice of bytes containing the block data
268 /// * `has_quality` - A boolean indicating whether the block contains quality scores
269 /// * `has_header` - A boolean indicating whether the block contains headers
270 ///
271 /// # Returns
272 ///
273 /// A `Result` indicating success or an error
274 fn ingest_bytes(&mut self, bytes: &[u8], has_quality: bool, has_header: bool, has_flags: bool) {
275 let mut pos = 0;
276 loop {
277 // Read the flag and advance the position
278 let flag = if has_flags {
279 if pos + 24 > bytes.len() {
280 break;
281 }
282 let flag = LittleEndian::read_u64(&bytes[pos..pos + 8]);
283 pos += 8;
284 Some(flag)
285 } else {
286 if pos + 16 > bytes.len() {
287 break;
288 }
289 None
290 };
291
292 // Read the primary length and advance the position
293 let slen = LittleEndian::read_u64(&bytes[pos..pos + 8]);
294 pos += 8;
295
296 // Read the extended length and advance the position
297 let xlen = LittleEndian::read_u64(&bytes[pos..pos + 8]);
298 pos += 8;
299
300 // No more records in the block
301 if slen == 0 {
302 // It is possible to end up here if the block is not full
303 // In this case the flag and the length are both zero
304 // and effectively blank but initialized memory.
305 break;
306 }
307
308 // Add the record to the block
309 self.flags.push(flag);
310 self.lens.push(slen);
311 self.lens.push(xlen);
312
313 // Add the primary sequence to the block
314 let mut seq = [0u8; 8];
315 for _ in 0..encoded_sequence_len(slen, self.bitsize) {
316 seq.copy_from_slice(&bytes[pos..pos + 8]);
317 self.sequences.push(LittleEndian::read_u64(&seq));
318 pos += 8;
319 }
320
321 // Add the primary quality score to the block
322 if has_quality {
323 let qual_buffer = &bytes[pos..pos + slen as usize];
324 self.qualities.extend_from_slice(qual_buffer);
325 pos += slen as usize;
326 }
327
328 // Add the primary header to the block
329 if has_header {
330 let s_header_length = LittleEndian::read_u64(&bytes[pos..pos + 8]);
331 self.header_lengths.push(s_header_length);
332 pos += 8; // Fixed: advance by 8 bytes for u64
333
334 let s_header_buffer = &bytes[pos..pos + s_header_length as usize];
335 self.headers.extend_from_slice(s_header_buffer);
336 pos += s_header_length as usize;
337 }
338
339 // Add the extended sequence to the block
340 for _ in 0..encoded_sequence_len(xlen, self.bitsize) {
341 seq.copy_from_slice(&bytes[pos..pos + 8]);
342 self.sequences.push(LittleEndian::read_u64(&seq));
343 pos += 8;
344 }
345
346 // Add the extended quality score to the block
347 if has_quality {
348 let qual_buffer = &bytes[pos..pos + xlen as usize];
349 self.qualities.extend_from_slice(qual_buffer);
350 pos += xlen as usize;
351 }
352
353 // Add the extended header to the block
354 if has_header && xlen > 0 {
355 let x_header_length = LittleEndian::read_u64(&bytes[pos..pos + 8]);
356 self.header_lengths.push(x_header_length);
357 pos += 8; // Fixed: advance by 8 bytes for u64
358
359 let x_header_buffer = &bytes[pos..pos + x_header_length as usize];
360 self.headers.extend_from_slice(x_header_buffer);
361 pos += x_header_length as usize;
362 }
363 }
364 }
365
366 fn ingest_compressed_bytes(
367 &mut self,
368 bytes: &[u8],
369 has_quality: bool,
370 has_header: bool,
371 has_flags: bool,
372 ) -> Result<()> {
373 let mut decoder = Decoder::with_buffer(bytes)?;
374
375 let mut pos = 0;
376 loop {
377 let (flag, slen, xlen) = if has_flags {
378 // Check that we have enough bytes to at least read the flag
379 // and lengths. If not, break out of the loop.
380 if pos + 24 > self.block_size {
381 break;
382 }
383
384 // Pull the preambles out of the compressed block and advance the position
385 let mut preamble = [0u8; 24];
386 decoder.read_exact(&mut preamble)?;
387 pos += 24;
388
389 // Read the flag + lengths
390 let flag = LittleEndian::read_u64(&preamble[0..8]);
391 let slen = LittleEndian::read_u64(&preamble[8..16]);
392 let xlen = LittleEndian::read_u64(&preamble[16..24]);
393 (Some(flag), slen, xlen)
394 } else {
395 // Check that we have enough bytes to at least read the
396 // lengths. If not, break out of the loop.
397 if pos + 16 > self.block_size {
398 break;
399 }
400
401 // Pull the preambles out of the compressed block and advance the position
402 let mut preamble = [0u8; 16];
403 decoder.read_exact(&mut preamble)?;
404 pos += 16;
405
406 // Read the flag + lengths
407 let slen = LittleEndian::read_u64(&preamble[0..8]);
408 let xlen = LittleEndian::read_u64(&preamble[8..16]);
409 (None, slen, xlen)
410 };
411
412 // No more records in the block
413 if slen == 0 {
414 // It is possible to end up here if the block is not full
415 // In this case the flag and the length are both zero
416 // and effectively blank but initialized memory.
417 break;
418 }
419
420 // Add the record to the block
421 self.flags.push(flag);
422 self.lens.push(slen);
423 self.lens.push(xlen);
424
425 // Read the primary sequence and advance the position
426 let schunk = encoded_sequence_len(slen, self.bitsize);
427 let schunk_bytes = schunk * 8;
428 self.rbuf.resize(schunk_bytes, 0);
429 decoder.read_exact(&mut self.rbuf[0..schunk_bytes])?;
430 for chunk in self.rbuf.chunks_exact(8) {
431 let seq_part = LittleEndian::read_u64(chunk);
432 self.sequences.push(seq_part);
433 }
434 self.rbuf.clear();
435 pos += schunk_bytes;
436
437 // Add the primary quality score to the block
438 if has_quality {
439 self.rbuf.resize(slen as usize, 0);
440 decoder.read_exact(&mut self.rbuf[0..slen as usize])?;
441 self.qualities.extend_from_slice(&self.rbuf);
442 self.rbuf.clear();
443 pos += slen as usize;
444 }
445
446 // Add the primary header to the block
447 if has_header {
448 self.rbuf.resize(8, 0);
449 decoder.read_exact(&mut self.rbuf[0..8])?;
450 let s_header_length = LittleEndian::read_u64(&self.rbuf);
451 self.header_lengths.push(s_header_length);
452 self.rbuf.clear();
453 pos += 8;
454
455 self.rbuf.resize(s_header_length as usize, 0);
456 decoder.read_exact(&mut self.rbuf[0..s_header_length as usize])?;
457 self.headers.extend_from_slice(&self.rbuf);
458 self.rbuf.clear();
459 pos += s_header_length as usize;
460 }
461
462 // Read the extended sequence and advance the position
463 let xchunk = encoded_sequence_len(xlen, self.bitsize);
464 let xchunk_bytes = xchunk * 8;
465 self.rbuf.resize(xchunk_bytes, 0);
466 decoder.read_exact(&mut self.rbuf[0..xchunk_bytes])?;
467 for chunk in self.rbuf.chunks_exact(8) {
468 let seq_part = LittleEndian::read_u64(chunk);
469 self.sequences.push(seq_part);
470 }
471 self.rbuf.clear();
472 pos += xchunk_bytes;
473
474 // Add the extended quality score to the block
475 if has_quality {
476 self.rbuf.resize(xlen as usize, 0);
477 decoder.read_exact(&mut self.rbuf[0..xlen as usize])?;
478 self.qualities.extend_from_slice(&self.rbuf);
479 self.rbuf.clear();
480 pos += xlen as usize;
481 }
482
483 // Add the extended header to the block
484 if has_header && xlen > 0 {
485 self.rbuf.resize(8, 0);
486 decoder.read_exact(&mut self.rbuf[0..8])?;
487 let x_header_length = LittleEndian::read_u64(&self.rbuf);
488 self.header_lengths.push(x_header_length);
489 self.rbuf.clear();
490 pos += 8;
491
492 self.rbuf.resize(x_header_length as usize, 0);
493 decoder.read_exact(&mut self.rbuf[0..x_header_length as usize])?;
494 self.headers.extend_from_slice(&self.rbuf);
495 self.rbuf.clear();
496 pos += x_header_length as usize;
497 }
498 }
499 Ok(())
500 }
501}
502
503pub struct RecordBlockIter<'a> {
504 block: &'a RecordBlock,
505 /// Record position in the block
506 rpos: usize,
507 /// Encoded sequence position in the block
508 epos: usize,
509 /// Header position in the block
510 hpos: usize,
511 /// Quality position in the block
512 qpos: usize,
513}
514impl<'a> RecordBlockIter<'a> {
515 #[must_use]
516 pub fn new(block: &'a RecordBlock) -> Self {
517 Self {
518 block,
519 rpos: 0,
520 epos: 0,
521 hpos: 0,
522 qpos: 0,
523 }
524 }
525}
526impl<'a> Iterator for RecordBlockIter<'a> {
527 type Item = RefRecord<'a>;
528
529 fn next(&mut self) -> Option<Self::Item> {
530 if self.rpos == self.block.n_records() {
531 return None;
532 }
533
534 let index = (self.block.index + self.rpos) as u64;
535 let flag = self.block.flags[self.rpos];
536 let slen = self.block.lens[2 * self.rpos];
537 let xlen = self.block.lens[(2 * self.rpos) + 1];
538 let schunk = encoded_sequence_len(slen, self.block.bitsize);
539 let xchunk = encoded_sequence_len(xlen, self.block.bitsize);
540
541 // Handle sequences
542 let s_seq = &self.block.sequences[self.epos..self.epos + schunk];
543 self.epos += schunk;
544
545 let x_seq = &self.block.sequences[self.epos..self.epos + xchunk];
546 self.epos += xchunk;
547
548 // Handle quality scores (separate position tracking)
549 let s_qual = if self.block.qualities.is_empty() {
550 &[]
551 } else {
552 let qual = &self.block.qualities[self.qpos..self.qpos + slen as usize];
553 self.qpos += slen as usize;
554 qual
555 };
556
557 let x_qual = if self.block.qualities.is_empty() {
558 &[]
559 } else {
560 let qual = &self.block.qualities[self.qpos..self.qpos + xlen as usize];
561 self.qpos += xlen as usize;
562 qual
563 };
564
565 // Handle headers (separate position tracking)
566 let header_idx = if xlen > 0 { 2 * self.rpos } else { self.rpos };
567 let mut shlen = 0;
568 let s_header = if self.block.headers.is_empty() {
569 &[]
570 } else {
571 // Get header length
572 shlen = self.block.header_lengths[header_idx];
573
574 // Extract header data
575 let header = &self.block.headers[self.hpos..self.hpos + shlen as usize];
576 self.hpos += shlen as usize;
577 header
578 };
579
580 let mut xhlen = 0;
581 let x_header = if self.block.headers.is_empty() || xlen == 0 {
582 &[]
583 } else {
584 xhlen = self.block.header_lengths[header_idx + 1];
585 let header = &self.block.headers[self.hpos..self.hpos + xhlen as usize];
586 self.hpos += xhlen as usize;
587 header
588 };
589
590 // update record position
591 self.rpos += 1;
592
593 Some(RefRecord::new(
594 self.block.bitsize,
595 index,
596 flag,
597 slen,
598 xlen,
599 s_seq,
600 x_seq,
601 s_qual,
602 x_qual,
603 shlen,
604 s_header,
605 xhlen,
606 x_header,
607 ))
608 }
609}
610
611/// A reference to a record in a VBINSEQ file
612///
613/// `RefRecord` provides a lightweight view into a record within a `RecordBlock`.
614/// It holds references to the underlying data rather than owning it, making it
615/// efficient to iterate through records without copying data.
616///
617/// Each record contains a primary sequence (accessible via `sbuf` and related methods)
618/// and optionally a paired/extended sequence (accessible via `xbuf` and related methods).
619/// Both sequences may also have associated quality scores.
620///
621/// # Examples
622///
623/// ```rust,no_run
624/// use binseq::vbq::MmapReader;
625/// use binseq::BinseqRecord;
626///
627/// let mut reader = MmapReader::new("example.vbq").unwrap();
628/// let mut block = reader.new_block();
629/// reader.read_block_into(&mut block).unwrap();
630///
631/// let mut sequence = Vec::new();
632///
633/// for record in block.iter() {
634/// // Get record metadata
635/// println!("Record {}, flag: {:?}", record.index(), record.flag());
636///
637/// // Decode the primary sequence
638/// record.decode_s(&mut sequence).unwrap();
639/// println!("Sequence: {}", std::str::from_utf8(&sequence).unwrap());
640/// sequence.clear();
641///
642/// // If this is a paired record, decode the paired sequence
643/// if record.is_paired() {
644/// record.decode_x(&mut sequence).unwrap();
645/// println!("Paired sequence: {}", std::str::from_utf8(&sequence).unwrap());
646/// sequence.clear();
647/// }
648///
649/// // Access quality scores if available
650/// if record.has_quality() {
651/// println!("Quality scores available");
652/// }
653/// }
654/// ```
655pub struct RefRecord<'a> {
656 /// Bitsize of the record
657 bitsize: BitSize,
658
659 /// Global index of this record within the file
660 index: u64,
661
662 /// Flag value for this record (can be used for custom metadata)
663 flag: Option<u64>,
664
665 /// Length of the primary sequence in nucleotides
666 slen: u64,
667
668 /// Length of the extended/paired sequence in nucleotides (0 if not paired)
669 xlen: u64,
670
671 /// Buffer containing the encoded primary nucleotide sequence
672 sbuf: &'a [u64],
673
674 /// Buffer containing the encoded extended/paired nucleotide sequence
675 xbuf: &'a [u64],
676
677 /// Quality scores for the primary sequence (empty if quality scores not present)
678 squal: &'a [u8],
679
680 /// Quality scores for the extended/paired sequence (empty if not paired or no quality)
681 xqual: &'a [u8],
682
683 /// Length of the header for the primary sequence in bytes
684 shlen: u64,
685
686 /// Header for the record
687 sheader: &'a [u8],
688
689 /// Length of the header for the extended/paired sequence in bytes
690 xhlen: u64,
691
692 /// Header for the extended/paired sequence (empty if not paired)
693 xheader: &'a [u8],
694}
695impl<'a> RefRecord<'a> {
696 #[allow(clippy::too_many_arguments)]
697 #[must_use]
698 pub fn new(
699 bitsize: BitSize,
700 index: u64,
701 flag: Option<u64>,
702 slen: u64,
703 xlen: u64,
704 sbuf: &'a [u64],
705 xbuf: &'a [u64],
706 squal: &'a [u8],
707 xqual: &'a [u8],
708 shlen: u64,
709 sheader: &'a [u8],
710 xhlen: u64,
711 xheader: &'a [u8],
712 ) -> Self {
713 Self {
714 bitsize,
715 index,
716 flag,
717 slen,
718 xlen,
719 sbuf,
720 xbuf,
721 squal,
722 xqual,
723 shlen,
724 sheader,
725 xhlen,
726 xheader,
727 }
728 }
729}
730
731impl RefRecord<'_> {
732 pub fn shlen(&self) -> u64 {
733 self.shlen
734 }
735 pub fn xhlen(&self) -> u64 {
736 self.xhlen
737 }
738}
739
740impl BinseqRecord for RefRecord<'_> {
741 fn bitsize(&self) -> BitSize {
742 self.bitsize
743 }
744 fn index(&self) -> u64 {
745 self.index
746 }
747 fn sheader(&self, buffer: &mut Vec<u8>) {
748 buffer.clear();
749 if self.sheader.is_empty() {
750 buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes());
751 } else {
752 buffer.extend_from_slice(self.sheader);
753 }
754 }
755 fn xheader(&self, buffer: &mut Vec<u8>) {
756 buffer.clear();
757 if self.sheader.is_empty() {
758 buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes());
759 } else {
760 buffer.extend_from_slice(self.xheader);
761 }
762 }
763 fn flag(&self) -> Option<u64> {
764 self.flag
765 }
766 fn slen(&self) -> u64 {
767 self.slen
768 }
769 fn xlen(&self) -> u64 {
770 self.xlen
771 }
772 fn sbuf(&self) -> &[u64] {
773 self.sbuf
774 }
775 fn xbuf(&self) -> &[u64] {
776 self.xbuf
777 }
778 fn squal(&self) -> &[u8] {
779 self.squal
780 }
781 fn xqual(&self) -> &[u8] {
782 self.xqual
783 }
784}
785
786/// Memory-mapped reader for VBINSEQ files
787///
788/// [`MmapReader`] provides efficient, memory-mapped access to VBINSEQ files. It allows
789/// sequential reading of record blocks and supports parallel processing of records.
790///
791/// ## Format Support (v0.7.0+)
792///
793/// - **Embedded Index**: Automatically loads index from within VBQ files
794/// - **Headers Support**: Reads optional sequence headers/identifiers from records
795/// - **Multi-bit Encoding**: Supports both 2-bit and 4-bit nucleotide encodings
796/// - **Extended Capacity**: u64 indexing supports files with more than 4 billion records
797///
798/// Memory mapping allows the operating system to lazily load file contents as needed,
799/// which can be more efficient than standard file I/O, especially for large files.
800///
801/// The [`MmapReader`] is designed to be used in a multi-threaded environment, and it
802/// is built around [`RecordBlock`]s which are the units of data in a VBINSEQ file.
803/// Each one would be held by a separate thread and would load data from the shared
804/// [`MmapReader`] through the [`MmapReader::read_block_into`] method. However, they can
805/// also be used in a single-threaded environment for sequential processing.
806///
807/// Each [`RecordBlock`] contains a [`BlockHeader`] and is used to access [`RefRecord`]s
808/// which implement the [`BinseqRecord`] trait.
809///
810/// ## Index Loading
811///
812/// The reader automatically loads the embedded index by:
813/// 1. Reading the index trailer from the end of the file
814/// 2. Validating the INDEX_END_MAGIC marker
815/// 3. Decompressing the embedded index data
816/// 4. Using the index for efficient random access and parallel processing
817///
818/// # Examples
819///
820/// ```
821/// use binseq::vbq::MmapReader;
822/// use binseq::{BinseqRecord, Result};
823///
824/// fn main() -> Result<()> {
825/// let path = "./data/subset.vbq";
826/// let mut reader = MmapReader::new(path)?; // Index loaded automatically
827///
828/// // Create buffers for sequence data and headers
829/// let mut seq_buffer = Vec::new();
830/// let mut header_buffer = Vec::new();
831/// let mut block = reader.new_block();
832///
833/// // Read blocks sequentially
834/// while reader.read_block_into(&mut block)? {
835/// println!("Read a block with {} records", block.n_records());
836/// for record in block.iter() {
837/// // Decode sequence and header
838/// record.decode_s(&mut seq_buffer)?;
839/// record.sheader(&mut header_buffer);
840///
841/// println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap_or(""));
842/// println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap_or(""));
843///
844/// seq_buffer.clear();
845/// header_buffer.clear();
846/// }
847/// }
848/// Ok(())
849/// }
850/// ```
851pub struct MmapReader {
852 /// Path to the VBINSEQ file
853 path: PathBuf,
854
855 /// Memory-mapped file contents for efficient access
856 mmap: Arc<Mmap>,
857
858 /// Parsed header information from the file
859 header: VBinseqHeader,
860
861 /// Current cursor position in the file (in bytes)
862 pos: usize,
863
864 /// Total number of records read from the file so far
865 total: usize,
866}
867impl MmapReader {
868 /// Creates a new `MmapReader` for a VBINSEQ file
869 ///
870 /// This method opens the specified file, memory-maps its contents, reads the
871 /// VBINSEQ header information, and loads the embedded index. The reader is positioned
872 /// at the beginning of the first record block after the header.
873 ///
874 /// ## Index Loading (v0.7.0+)
875 ///
876 /// The embedded index is automatically loaded from the end of the file. For legacy
877 /// files with separate `.vqi` index files, the index is automatically migrated to
878 /// the embedded format.
879 ///
880 /// # Parameters
881 ///
882 /// * `path` - Path to the VBINSEQ file to open
883 ///
884 /// # Returns
885 ///
886 /// A new `MmapReader` instance if successful
887 ///
888 /// # Errors
889 ///
890 /// * `ReadError::InvalidFileType` if the path doesn't point to a regular file
891 /// * I/O errors if the file can't be opened or memory-mapped
892 /// * Header validation errors if the file doesn't contain a valid VBINSEQ header
893 ///
894 /// # Examples
895 ///
896 /// ```rust,no_run
897 /// use binseq::vbq::MmapReader;
898 ///
899 /// let reader = MmapReader::new("path/to/file.vbq").unwrap();
900 /// ```
901 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
902 // Verify it's a regular file before attempting to map
903 let file = File::open(&path)?;
904 if !file.metadata()?.is_file() {
905 return Err(ReadError::InvalidFileType.into());
906 }
907
908 // Safety: The file is open and won't be modified while mapped
909 let mmap = unsafe { Mmap::map(&file)? };
910
911 // Read header from mapped memory
912 let header = {
913 let mut header_bytes = [0u8; SIZE_HEADER];
914 header_bytes.copy_from_slice(&mmap[..SIZE_HEADER]);
915 VBinseqHeader::from_bytes(&header_bytes)?
916 };
917
918 Ok(Self {
919 path: PathBuf::from(path.as_ref()),
920 mmap: Arc::new(mmap),
921 header,
922 pos: SIZE_HEADER,
923 total: 0,
924 })
925 }
926
927 /// Creates a new empty record block with the appropriate size for this file
928 ///
929 /// This creates a `RecordBlock` with a block size matching the one specified in the
930 /// file's header, ensuring it will be able to hold a full block of records.
931 ///
932 /// # Returns
933 ///
934 /// A new empty `RecordBlock` instance sized appropriately for this file
935 ///
936 /// # Examples
937 ///
938 /// ```rust,no_run
939 /// use binseq::vbq::MmapReader;
940 ///
941 /// let reader = MmapReader::new("example.vbq").unwrap();
942 /// let mut block = reader.new_block();
943 /// ```
944 #[must_use]
945 pub fn new_block(&self) -> RecordBlock {
946 RecordBlock::new(self.header.bits, self.header.block as usize)
947 }
948
949 /// Returns the path where the index file would be located
950 ///
951 /// The index file is used for random access to blocks and has the same path as
952 /// the VBINSEQ file with the ".vqi" extension appended.
953 ///
954 /// # Returns
955 ///
956 /// The path where the index file would be located
957 ///
958 /// # Examples
959 ///
960 /// ```
961 /// use binseq::vbq::MmapReader;
962 /// use binseq::Result;
963 ///
964 /// fn main() -> Result<()> {
965 /// let path = "./data/subset.vbq";
966 /// let reader = MmapReader::new(path)?;
967 /// let index_path = reader.index_path();
968 /// assert_eq!(index_path.to_str(), Some("./data/subset.vbq.vqi"));
969 /// Ok(())
970 /// }
971 /// ```
972 #[must_use]
973 pub fn index_path(&self) -> PathBuf {
974 let mut p = self.path.as_os_str().to_owned();
975 p.push(".vqi");
976 p.into()
977 }
978
979 /// Returns a copy of the file's header information
980 ///
981 /// The header contains information about the file format, including whether
982 /// quality scores are included, whether blocks are compressed, and whether
983 /// records are paired.
984 ///
985 /// # Returns
986 ///
987 /// A copy of the file's `VBinseqHeader`
988 #[must_use]
989 pub fn header(&self) -> VBinseqHeader {
990 self.header
991 }
992
993 /// Checks if the file contains paired records
994 #[must_use]
995 pub fn is_paired(&self) -> bool {
996 self.header.is_paired()
997 }
998
999 /// Fills an existing `RecordBlock` with the next block of records from the file
1000 ///
1001 /// This method reads the next block of records from the current position in the file
1002 /// and populates the provided `RecordBlock` with the data. The block is cleared and reused
1003 /// to avoid unnecessary memory allocations. This is the primary method for sequential
1004 /// reading of VBINSEQ files.
1005 ///
1006 /// The method automatically handles decompression if the file was written with
1007 /// compression enabled and updates the total record count as it progresses through the file.
1008 ///
1009 /// # Parameters
1010 ///
1011 /// * `block` - A mutable reference to a `RecordBlock` to be filled with data
1012 ///
1013 /// # Returns
1014 ///
1015 /// * `Ok(true)` - If a block was successfully read
1016 /// * `Ok(false)` - If the end of the file was reached (no more blocks)
1017 /// * `Err(_)` - If an error occurred during reading
1018 ///
1019 /// # Examples
1020 ///
1021 /// ```rust,no_run
1022 /// use binseq::vbq::MmapReader;
1023 /// use binseq::BinseqRecord;
1024 /// use std::io::Write;
1025 ///
1026 /// let mut reader = MmapReader::new("example.vbq").unwrap();
1027 /// let mut block = reader.new_block();
1028 /// let mut sequence_buffer = Vec::new();
1029 ///
1030 /// // Read blocks until the end of file
1031 /// while reader.read_block_into(&mut block).unwrap() {
1032 /// println!("Read block with {} records", block.n_records());
1033 ///
1034 /// // Process each record
1035 /// for record in block.iter() {
1036 /// // Decode the nucleotide sequence
1037 /// record.decode_s(&mut sequence_buffer).unwrap();
1038 ///
1039 /// // Do something with the sequence
1040 /// println!("Record {}: length {}", record.index(), sequence_buffer.len());
1041 /// sequence_buffer.clear();
1042 /// }
1043 /// }
1044 /// ```
1045 pub fn read_block_into(&mut self, block: &mut RecordBlock) -> Result<bool> {
1046 // Clear the block
1047 block.clear();
1048
1049 // Validate the next block header is within bounds and present
1050 if self.pos + SIZE_BLOCK_HEADER > self.mmap.len() {
1051 return Ok(false);
1052 }
1053 let mut header_bytes = [0u8; SIZE_BLOCK_HEADER];
1054 header_bytes.copy_from_slice(&self.mmap[self.pos..self.pos + SIZE_BLOCK_HEADER]);
1055 let header = match BlockHeader::from_bytes(&header_bytes) {
1056 Ok(header) => {
1057 self.pos += SIZE_BLOCK_HEADER;
1058 header
1059 }
1060 // Bytes left - but not a BlockHeader - could be the index
1061 Err(e) => {
1062 let mut index_header_bytes = [0u8; INDEX_HEADER_SIZE];
1063 index_header_bytes
1064 .copy_from_slice(&self.mmap[self.pos..self.pos + INDEX_HEADER_SIZE]);
1065 if IndexHeader::from_bytes(&index_header_bytes).is_ok() {
1066 // Expected end of file
1067 return Ok(false);
1068 }
1069 return Err(e.into());
1070 }
1071 };
1072
1073 // Read the block contents
1074 let rbound = if self.header.compressed {
1075 header.size as usize
1076 } else {
1077 self.header.block as usize
1078 };
1079 if self.pos + rbound > self.mmap.len() {
1080 return Err(ReadError::UnexpectedEndOfFile(self.pos).into());
1081 }
1082 let block_buffer = &self.mmap[self.pos..self.pos + rbound];
1083 if self.header.compressed {
1084 block.ingest_compressed_bytes(
1085 block_buffer,
1086 self.header.qual,
1087 self.header.headers,
1088 self.header.flags,
1089 )?;
1090 } else {
1091 block.ingest_bytes(
1092 block_buffer,
1093 self.header.qual,
1094 self.header.headers,
1095 self.header.flags,
1096 );
1097 }
1098
1099 // Update the block index
1100 block.update_index(self.total);
1101
1102 self.pos += rbound;
1103 self.total += header.records as usize;
1104
1105 Ok(true)
1106 }
1107
1108 /// Loads or creates the block index for this VBINSEQ file
1109 ///
1110 /// The block index provides metadata about each block in the file, enabling
1111 /// random access to blocks and parallel processing. This method first attempts to
1112 /// load an existing index file. If the index doesn't exist or doesn't match the
1113 /// current file, it automatically generates a new index from the VBINSEQ file
1114 /// and saves it for future use.
1115 ///
1116 /// # Returns
1117 ///
1118 /// The loaded or newly created `BlockIndex` if successful
1119 ///
1120 /// # Errors
1121 ///
1122 /// * File I/O errors when reading or creating the index
1123 /// * Parsing errors if the VBINSEQ file has invalid format
1124 /// * Other index-related errors that cannot be resolved by creating a new index
1125 ///
1126 /// # Examples
1127 ///
1128 /// ```rust,no_run
1129 /// use binseq::vbq::MmapReader;
1130 ///
1131 /// let reader = MmapReader::new("example.vbq").unwrap();
1132 ///
1133 /// // Load the index file (or create if it doesn't exist)
1134 /// let index = reader.load_index().unwrap();
1135 ///
1136 /// // Use the index to get information about the file
1137 /// println!("Number of blocks: {}", index.n_blocks());
1138 /// ```
1139 ///
1140 /// # Notes
1141 ///
1142 /// The index file is stored with the same path as the VBINSEQ file but with a ".vqi"
1143 /// extension appended. This allows for reusing the index across multiple runs,
1144 /// which can significantly improve startup performance for large files.
1145 pub fn load_index(&self) -> Result<BlockIndex> {
1146 let start_pos_magic = self.mmap.len() - 8;
1147 let start_pos_index_size = start_pos_magic - 8;
1148
1149 // Validate the magic number
1150 let magic = LittleEndian::read_u64(&self.mmap[start_pos_magic..]);
1151 if magic != INDEX_END_MAGIC {
1152 return Err(ReadError::MissingIndexEndMagic.into());
1153 }
1154
1155 // Get the index size
1156 let index_size = LittleEndian::read_u64(&self.mmap[start_pos_index_size..start_pos_magic]);
1157
1158 // Determine the start position of the index bytes
1159 let start_pos_index = start_pos_index_size - index_size as usize;
1160
1161 // Slice into the index bytes
1162 let index_bytes = &self.mmap[start_pos_index..start_pos_index_size];
1163
1164 // Build the index from the bytes
1165 BlockIndex::from_bytes(index_bytes)
1166 }
1167
1168 pub fn num_records(&self) -> Result<usize> {
1169 let index = self.load_index()?;
1170 Ok(index.num_records())
1171 }
1172}
1173
1174impl ParallelReader for MmapReader {
1175 /// Processes all records in the file in parallel using multiple threads
1176 ///
1177 /// This method provides efficient parallel processing of VBINSEQ files by distributing
1178 /// blocks across multiple worker threads. The file's block structure is leveraged to divide
1179 /// the work evenly without requiring thread synchronization during processing, which leads
1180 /// to near-linear scaling with the number of threads.
1181 ///
1182 /// The method automatically loads or creates an index file to identify block boundaries,
1183 /// then distributes the blocks among the requested number of threads. Each thread processes
1184 /// its assigned blocks sequentially, but multiple blocks are processed in parallel across
1185 /// threads.
1186 ///
1187 /// # Type Parameters
1188 ///
1189 /// * `P` - A type that implements the `ParallelProcessor` trait, which defines how records are processed
1190 ///
1191 /// # Parameters
1192 ///
1193 /// * `self` - Consumes the reader, as it will be used across multiple threads
1194 /// * `processor` - An instance of a type implementing `ParallelProcessor` that will be cloned for each thread
1195 /// * `num_threads` - Number of worker threads to use for processing
1196 ///
1197 /// # Returns
1198 ///
1199 /// * `Ok(())` - If all records were successfully processed
1200 /// * `Err(_)` - If an error occurs during processing
1201 ///
1202 /// # Examples
1203 ///
1204 /// ```rust,no_run
1205 /// use binseq::vbq::{MmapReader, RefRecord};
1206 /// use binseq::{ParallelProcessor, ParallelReader, BinseqRecord, Result};
1207 /// use std::sync::atomic::{AtomicUsize, Ordering};
1208 /// use std::sync::Arc;
1209 ///
1210 /// // Create a simple processor that counts records
1211 /// struct RecordCounter {
1212 /// count: Arc<AtomicUsize>,
1213 /// thread_id: usize,
1214 /// }
1215 ///
1216 /// impl RecordCounter {
1217 /// fn new() -> Self {
1218 /// Self {
1219 /// count: Arc::new(AtomicUsize::new(0)),
1220 /// thread_id: 0,
1221 /// }
1222 /// }
1223 ///
1224 /// fn total_count(&self) -> usize {
1225 /// self.count.load(Ordering::Relaxed)
1226 /// }
1227 /// }
1228 ///
1229 /// impl Clone for RecordCounter {
1230 /// fn clone(&self) -> Self {
1231 /// Self {
1232 /// count: Arc::clone(&self.count),
1233 /// thread_id: 0,
1234 /// }
1235 /// }
1236 /// }
1237 ///
1238 /// impl ParallelProcessor for RecordCounter {
1239 /// fn process_record<R: BinseqRecord>(&mut self, _record: R) -> Result<()> {
1240 /// self.count.fetch_add(1, Ordering::Relaxed);
1241 /// Ok(())
1242 /// }
1243 ///
1244 /// fn on_batch_complete(&mut self) -> Result<()> {
1245 /// // Optional: perform actions after each block is processed
1246 /// Ok(())
1247 /// }
1248 ///
1249 /// fn set_tid(&mut self, tid: usize) {
1250 /// self.thread_id = tid;
1251 /// }
1252 /// }
1253 ///
1254 /// // Use the processor with a VBINSEQ file
1255 /// let reader = MmapReader::new("example.vbq").unwrap();
1256 /// let counter = RecordCounter::new();
1257 ///
1258 /// // Process the file with 4 threads
1259 /// reader.process_parallel(counter.clone(), 4).unwrap();
1260 ///
1261 /// // Get the total number of records processed
1262 /// println!("Total records: {}", counter.total_count());
1263 /// ```
1264 ///
1265 /// # Notes
1266 ///
1267 /// * The `ParallelProcessor` instance is cloned for each worker thread, so any shared state
1268 /// should be wrapped in thread-safe containers like `Arc`.
1269 /// * The `set_tid` method is called with a unique thread ID before processing begins, which
1270 /// can be used to distinguish between worker threads.
1271 /// * This method consumes the reader (takes ownership), as it's distributed across threads.
1272 fn process_parallel<P: ParallelProcessor + Clone + 'static>(
1273 self,
1274 processor: P,
1275 num_threads: usize,
1276 ) -> Result<()> {
1277 let num_records = self.num_records()?;
1278 self.process_parallel_range(processor, num_threads, 0..num_records)
1279 }
1280
1281 /// Process records in parallel within a specified range
1282 ///
1283 /// This method allows parallel processing of a subset of records within the file,
1284 /// defined by a start and end index. The method maps the record range to the
1285 /// appropriate blocks and processes only the records within the specified range.
1286 ///
1287 /// # Arguments
1288 ///
1289 /// * `processor` - The processor to use for each record
1290 /// * `num_threads` - The number of threads to spawn
1291 /// * `start` - The starting record index (inclusive)
1292 /// * `end` - The ending record index (exclusive)
1293 ///
1294 /// # Type Parameters
1295 ///
1296 /// * `P` - A type that implements `ParallelProcessor` and can be cloned
1297 ///
1298 /// # Returns
1299 ///
1300 /// * `Ok(())` - If all records were processed successfully
1301 /// * `Err(Error)` - If an error occurred during processing
1302 fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
1303 self,
1304 processor: P,
1305 num_threads: usize,
1306 range: Range<usize>,
1307 ) -> Result<()> {
1308 // Calculate the number of threads to use
1309 let num_threads = if num_threads == 0 {
1310 num_cpus::get()
1311 } else {
1312 num_threads.min(num_cpus::get())
1313 };
1314
1315 // Generate or load the index first
1316 let index = self.load_index()?;
1317
1318 // Validate range
1319 let total_records = index.num_records();
1320 if range.start >= total_records || range.end > total_records || range.start >= range.end {
1321 return Ok(()); // Nothing to process or invalid range
1322 }
1323
1324 // Find blocks that contain records in the specified range
1325 let relevant_blocks = index
1326 .ranges()
1327 .iter()
1328 .filter(|r| {
1329 let iv_start = r.cumulative_records as usize;
1330 let iv_end = (r.cumulative_records + r.block_records as u64) as usize;
1331 iv_start < range.end && iv_end > range.start
1332 })
1333 .copied()
1334 .collect::<Vec<_>>();
1335
1336 if relevant_blocks.is_empty() {
1337 return Ok(()); // No relevant blocks
1338 }
1339
1340 // Calculate block assignments for threads
1341 let blocks_per_thread = relevant_blocks.len().div_ceil(num_threads);
1342
1343 // Create shared resources
1344 let mmap = Arc::clone(&self.mmap);
1345 let header = self.header;
1346
1347 // Spawn worker threads
1348 let mut handles = Vec::new();
1349
1350 for thread_id in 0..num_threads {
1351 // Calculate this thread's block range
1352 let start_block_idx = thread_id * blocks_per_thread;
1353 let end_block_idx =
1354 std::cmp::min((thread_id + 1) * blocks_per_thread, relevant_blocks.len());
1355
1356 if start_block_idx >= relevant_blocks.len() {
1357 continue;
1358 }
1359
1360 let mmap = Arc::clone(&mmap);
1361 let mut proc = processor.clone();
1362 proc.set_tid(thread_id);
1363
1364 // Get block ranges for this thread
1365 let thread_blocks: Vec<BlockRange> =
1366 relevant_blocks[start_block_idx..end_block_idx].to_vec();
1367
1368 let handle = std::thread::spawn(move || -> Result<()> {
1369 // Create block to reuse for processing (within thread)
1370 let mut record_block = RecordBlock::new(header.bits, header.block as usize);
1371
1372 // Process each assigned block
1373 for block_range in thread_blocks {
1374 // Clear the block for reuse
1375 record_block.clear();
1376
1377 // Skip the block header to get to data
1378 let block_start = block_range.start_offset as usize + SIZE_BLOCK_HEADER;
1379 let block_data = &mmap[block_start..block_start + block_range.len as usize];
1380
1381 // Ingest data according to the compression setting
1382 if header.compressed {
1383 record_block.ingest_compressed_bytes(
1384 block_data,
1385 header.qual,
1386 header.headers,
1387 header.flags,
1388 )?;
1389 } else {
1390 record_block.ingest_bytes(
1391 block_data,
1392 header.qual,
1393 header.headers,
1394 header.flags,
1395 );
1396 }
1397
1398 // Update the record block index
1399 record_block.update_index(block_range.cumulative_records as usize);
1400
1401 // Process records in this block that fall within our range
1402 for record in record_block.iter() {
1403 let global_record_idx = record.index as usize;
1404
1405 // Only process records within our specified range
1406 if global_record_idx >= range.start && global_record_idx < range.end {
1407 proc.process_record(record)?;
1408 }
1409 }
1410
1411 // Signal batch completion
1412 proc.on_batch_complete()?;
1413 }
1414
1415 Ok(())
1416 });
1417
1418 handles.push(handle);
1419 }
1420
1421 // Wait for all threads to complete
1422 for handle in handles {
1423 handle.join().unwrap()?;
1424 }
1425
1426 Ok(())
1427 }
1428}