use std::fs::File;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use bitnuc::BitSize;
use byteorder::{ByteOrder, LittleEndian};
use memmap2::Mmap;
use zstd::zstd_safe;
use super::{
BlockHeader, BlockIndex, BlockRange, FileHeader,
header::{SIZE_BLOCK_HEADER, SIZE_HEADER},
};
use crate::DEFAULT_QUALITY_SCORE;
use crate::vbq::index::{INDEX_END_MAGIC, INDEX_HEADER_SIZE, IndexHeader};
use crate::{
BinseqRecord, ParallelProcessor, ParallelReader,
error::{ReadError, Result},
};
fn encoded_sequence_len(len: u64, bitsize: BitSize) -> usize {
match bitsize {
BitSize::Two => len.div_ceil(32) as usize,
BitSize::Four => len.div_ceil(16) as usize,
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Span {
offset: usize,
len: usize,
}
impl Span {
fn new(offset: usize, len: usize) -> Self {
Self { offset, len }
}
fn slice<'a>(&self, buffer: &'a [u8]) -> &'a [u8] {
&buffer[self.offset..self.offset + self.len]
}
fn slice_u64<'a>(&self, buffer: &'a [u64]) -> &'a [u64] {
&buffer[self.offset..self.offset + self.len]
}
}
#[derive(Debug, Clone, Copy)]
struct RecordMetadata {
flag: Option<u64>,
slen: u64,
xlen: u64,
s_seq_span: Span, s_qual_span: Span, s_header_span: Span,
x_seq_span: Span, x_qual_span: Span, x_header_span: Span,
has_quality: bool,
}
pub struct RecordBlock {
bitsize: BitSize,
index: usize,
rbuf: Vec<u8>,
sequences: Vec<u64>,
records: Vec<RecordMetadata>,
block_size: usize,
dctx: zstd_safe::DCtx<'static>,
dbuf: Vec<u8>,
qbuf: Vec<u8>,
default_quality_score: u8,
}
impl RecordBlock {
#[must_use]
pub fn new(bitsize: BitSize, block_size: usize) -> Self {
Self {
bitsize,
index: 0,
block_size,
records: Vec::default(),
sequences: Vec::default(),
rbuf: Vec::default(),
dbuf: Vec::default(),
dctx: zstd_safe::DCtx::create(),
qbuf: Vec::default(),
default_quality_score: DEFAULT_QUALITY_SCORE,
}
}
pub fn set_default_quality_score(&mut self, score: u8) {
self.default_quality_score = score;
self.qbuf.clear();
}
#[must_use]
pub fn n_records(&self) -> usize {
self.records.len()
}
#[must_use]
#[allow(clippy::iter_without_into_iter)]
pub fn iter(&self) -> RecordBlockIter<'_> {
RecordBlockIter::new(self)
}
fn update_index(&mut self, index: usize) {
self.index = index;
}
pub fn clear(&mut self) {
self.index = 0;
self.records.clear();
self.sequences.clear();
self.dbuf.clear();
}
fn ingest_bytes(
&mut self,
bytes: &[u8],
has_quality: bool,
has_header: bool,
has_flags: bool,
) -> Result<()> {
if bytes.len() != self.block_size {
return Err(ReadError::PartialRecord(bytes.len()).into());
}
self.rbuf.clear();
self.rbuf.extend_from_slice(bytes);
self.parse_records(has_quality, has_header, has_flags);
Ok(())
}
fn ingest_compressed_bytes(
&mut self,
bytes: &[u8],
has_quality: bool,
has_header: bool,
has_flags: bool,
) -> Result<()> {
self.rbuf.clear();
if self.rbuf.capacity() < self.block_size {
self.rbuf.reserve(self.block_size - self.rbuf.capacity());
}
let bytes_read = self
.dctx
.decompress(&mut self.rbuf, bytes)
.map_err(|code| std::io::Error::other(zstd_safe::get_error_name(code)))?;
if bytes_read != self.block_size {
return Err(ReadError::PartialRecord(bytes_read).into());
}
self.parse_records(has_quality, has_header, has_flags);
Ok(())
}
fn parse_records(&mut self, has_quality: bool, has_header: bool, has_flags: bool) {
self.records.clear();
self.sequences.clear();
let mut pos = 0;
let bytes = &self.rbuf;
loop {
let min_header_size = if has_flags { 24 } else { 16 };
if pos + min_header_size > bytes.len() {
break;
}
let flag = if has_flags {
let flag = LittleEndian::read_u64(&bytes[pos..pos + 8]);
pos += 8;
Some(flag)
} else {
None
};
let slen = LittleEndian::read_u64(&bytes[pos..pos + 8]);
pos += 8;
let xlen = LittleEndian::read_u64(&bytes[pos..pos + 8]);
pos += 8;
if slen == 0 {
break;
}
let s_seq_words = encoded_sequence_len(slen, self.bitsize);
let x_seq_words = encoded_sequence_len(xlen, self.bitsize);
let s_seq_span = Span::new(self.sequences.len(), s_seq_words);
for _ in 0..s_seq_words {
let val = LittleEndian::read_u64(&bytes[pos..pos + 8]);
self.sequences.push(val);
pos += 8;
}
let s_qual_span = if has_quality {
let span = Span::new(pos, slen as usize);
pos += slen as usize;
span
} else {
Span::new(0, 0)
};
let s_header_span = if has_header {
let header_len = LittleEndian::read_u64(&bytes[pos..pos + 8]) as usize;
pos += 8;
let span = Span::new(pos, header_len);
pos += header_len;
span
} else {
Span::new(0, 0)
};
let x_seq_span = Span::new(self.sequences.len(), x_seq_words);
for _ in 0..x_seq_words {
let val = LittleEndian::read_u64(&bytes[pos..pos + 8]);
self.sequences.push(val);
pos += 8;
}
let x_qual_span = if has_quality {
let span = Span::new(pos, xlen as usize);
pos += xlen as usize;
span
} else {
Span::new(0, 0)
};
let x_header_span = if has_header && xlen > 0 {
let header_len = LittleEndian::read_u64(&bytes[pos..pos + 8]) as usize;
pos += 8;
let span = Span::new(pos, header_len);
pos += header_len;
span
} else {
Span::new(0, 0)
};
if !has_quality {
let max_size = slen.max(xlen) as usize;
if self.qbuf.len() < max_size {
self.qbuf.resize(max_size, self.default_quality_score);
}
}
self.records.push(RecordMetadata {
flag,
slen,
xlen,
s_seq_span,
s_qual_span,
s_header_span,
x_seq_span,
x_qual_span,
x_header_span,
has_quality,
});
}
}
pub fn decode_all(&mut self) -> Result<()> {
if self.sequences.is_empty() {
return Ok(());
}
self.dbuf.clear();
match self.bitsize {
BitSize::Two => {
let num_bp = self.sequences.len() * 32;
bitnuc::twobit::decode(&self.sequences, num_bp, &mut self.dbuf)
}
BitSize::Four => {
let num_bp = self.sequences.len() * 16;
bitnuc::fourbit::decode(&self.sequences, num_bp, &mut self.dbuf)
}
}?;
Ok(())
}
#[must_use]
pub fn get_decoded_s(&self, record_idx: usize) -> Option<&[u8]> {
let meta = self.records.get(record_idx)?;
if self.dbuf.is_empty() {
return None;
}
let bases_per_word = match self.bitsize {
BitSize::Two => 32,
BitSize::Four => 16,
};
let offset = meta.s_seq_span.offset * bases_per_word;
let len = meta.slen as usize;
Some(&self.dbuf[offset..offset + len])
}
#[must_use]
pub fn get_decoded_x(&self, record_idx: usize) -> Option<&[u8]> {
let meta = self.records.get(record_idx)?;
if meta.xlen == 0 {
return Some(&[]);
}
if self.dbuf.is_empty() {
return None;
}
let bases_per_word = match self.bitsize {
BitSize::Two => 32,
BitSize::Four => 16,
};
let offset = meta.x_seq_span.offset * bases_per_word;
let len = meta.xlen as usize;
Some(&self.dbuf[offset..offset + len])
}
}
pub struct RecordBlockIter<'a> {
block: &'a RecordBlock,
pos: usize,
header_buffer: itoa::Buffer,
qbuf: &'a [u8],
}
impl<'a> RecordBlockIter<'a> {
#[must_use]
pub fn new(block: &'a RecordBlock) -> Self {
Self {
block,
pos: 0,
header_buffer: itoa::Buffer::new(),
qbuf: &block.qbuf,
}
}
}
impl<'a> Iterator for RecordBlockIter<'a> {
type Item = RefRecord<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.block.records.len() {
return None;
}
let meta = &self.block.records[self.pos];
let index = (self.block.index + self.pos) as u64;
let index_in_block = self.pos;
let mut header_buf = [0; 20];
let mut header_len = 0;
if meta.s_header_span.len == 0 && meta.x_header_span.len == 0 {
let header_str = self.header_buffer.format(index);
header_len = header_str.len();
header_buf[..header_len].copy_from_slice(header_str.as_bytes());
}
let (squal, xqual) = if meta.has_quality {
(
meta.s_qual_span.slice(&self.block.rbuf),
meta.x_qual_span.slice(&self.block.rbuf),
)
} else {
(
&self.qbuf[..meta.slen as usize],
&self.qbuf[..meta.xlen as usize],
)
};
{
self.pos += 1;
}
Some(RefRecord {
block: self.block,
bitsize: self.block.bitsize,
index,
index_in_block,
flag: meta.flag,
slen: meta.slen,
xlen: meta.xlen,
sbuf: meta.s_seq_span.slice_u64(&self.block.sequences),
xbuf: meta.x_seq_span.slice_u64(&self.block.sequences),
squal,
xqual,
sheader: meta.s_header_span.slice(&self.block.rbuf),
xheader: meta.x_header_span.slice(&self.block.rbuf),
header_buf,
header_len,
})
}
}
pub struct RefRecord<'a> {
block: &'a RecordBlock,
bitsize: BitSize,
index: u64,
index_in_block: usize,
flag: Option<u64>,
slen: u64,
xlen: u64,
sbuf: &'a [u64],
xbuf: &'a [u64],
squal: &'a [u8],
xqual: &'a [u8],
sheader: &'a [u8],
xheader: &'a [u8],
header_buf: [u8; 20],
header_len: usize,
}
impl BinseqRecord for RefRecord<'_> {
fn bitsize(&self) -> BitSize {
self.bitsize
}
fn index(&self) -> u64 {
self.index
}
fn sheader(&self) -> &[u8] {
if self.sheader.is_empty() {
&self.header_buf[..self.header_len]
} else {
self.sheader
}
}
fn xheader(&self) -> &[u8] {
if self.xheader.is_empty() {
&self.header_buf[..self.header_len]
} else {
self.xheader
}
}
fn flag(&self) -> Option<u64> {
self.flag
}
fn slen(&self) -> u64 {
self.slen
}
fn xlen(&self) -> u64 {
self.xlen
}
fn sbuf(&self) -> &[u64] {
self.sbuf
}
fn xbuf(&self) -> &[u64] {
self.xbuf
}
fn squal(&self) -> &[u8] {
self.squal
}
fn xqual(&self) -> &[u8] {
self.xqual
}
fn decode_s(&self, buf: &mut Vec<u8>) -> Result<()> {
if let Some(decoded) = self.block.get_decoded_s(self.index_in_block) {
buf.extend_from_slice(decoded);
} else {
self.bitsize()
.decode(self.sbuf(), self.slen() as usize, buf)?;
}
Ok(())
}
fn decode_x(&self, buf: &mut Vec<u8>) -> Result<()> {
if let Some(decoded) = self.block.get_decoded_x(self.index_in_block) {
buf.extend_from_slice(decoded);
} else {
self.bitsize()
.decode(self.xbuf(), self.xlen() as usize, buf)?;
}
Ok(())
}
fn sseq(&self) -> &[u8] {
self.block
.get_decoded_s(self.index_in_block)
.expect("Reader was built without batch-decoding")
}
fn xseq(&self) -> &[u8] {
self.block
.get_decoded_x(self.index_in_block)
.expect("Reader was built without batch-decoding")
}
}
pub struct MmapReader {
mmap: Arc<Mmap>,
header: FileHeader,
pos: usize,
total: usize,
decode_block: bool,
default_quality_score: u8,
}
impl MmapReader {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(&path)?;
if !file.metadata()?.is_file() {
return Err(ReadError::InvalidFileType.into());
}
let mmap = unsafe { Mmap::map(&file)? };
let header = {
let mut header_bytes = [0u8; SIZE_HEADER];
header_bytes.copy_from_slice(&mmap[..SIZE_HEADER]);
FileHeader::from_bytes(&header_bytes)?
};
Ok(Self {
mmap: Arc::new(mmap),
header,
pos: SIZE_HEADER,
total: 0,
decode_block: true,
default_quality_score: DEFAULT_QUALITY_SCORE,
})
}
pub fn set_default_quality_score(&mut self, score: u8) {
self.default_quality_score = score;
}
#[must_use]
pub fn new_block(&self) -> RecordBlock {
let mut block = RecordBlock::new(self.header.bits, self.header.block as usize);
block.set_default_quality_score(self.default_quality_score);
block
}
pub fn set_decode_block(&mut self, decode_block: bool) {
self.decode_block = decode_block;
}
#[must_use]
pub fn header(&self) -> FileHeader {
self.header
}
#[must_use]
pub fn is_paired(&self) -> bool {
self.header.is_paired()
}
pub fn read_block_into(&mut self, block: &mut RecordBlock) -> Result<bool> {
block.clear();
if self.pos + SIZE_BLOCK_HEADER > self.mmap.len() {
return Ok(false);
}
let mut header_bytes = [0u8; SIZE_BLOCK_HEADER];
header_bytes.copy_from_slice(&self.mmap[self.pos..self.pos + SIZE_BLOCK_HEADER]);
let header = match BlockHeader::from_bytes(&header_bytes) {
Ok(header) => {
self.pos += SIZE_BLOCK_HEADER;
header
}
Err(e) => {
let mut index_header_bytes = [0u8; INDEX_HEADER_SIZE];
index_header_bytes
.copy_from_slice(&self.mmap[self.pos..self.pos + INDEX_HEADER_SIZE]);
if IndexHeader::from_bytes(&index_header_bytes).is_ok() {
return Ok(false);
}
return Err(e);
}
};
let rbound = if self.header.compressed {
header.size as usize
} else {
self.header.block as usize
};
if self.pos + rbound > self.mmap.len() {
return Err(ReadError::UnexpectedEndOfFile(self.pos).into());
}
let block_buffer = &self.mmap[self.pos..self.pos + rbound];
if self.header.compressed {
block.ingest_compressed_bytes(
block_buffer,
self.header.qual,
self.header.headers,
self.header.flags,
)?;
} else {
block.ingest_bytes(
block_buffer,
self.header.qual,
self.header.headers,
self.header.flags,
)?;
}
block.update_index(self.total);
self.pos += rbound;
self.total += header.records as usize;
Ok(true)
}
pub fn load_index(&self) -> Result<BlockIndex> {
let start_pos_magic = self.mmap.len() - 8;
let start_pos_index_size = start_pos_magic - 8;
let magic = LittleEndian::read_u64(&self.mmap[start_pos_magic..]);
if magic != INDEX_END_MAGIC {
return Err(ReadError::MissingIndexEndMagic.into());
}
let index_size = LittleEndian::read_u64(&self.mmap[start_pos_index_size..start_pos_magic]);
let start_pos_index = start_pos_index_size - index_size as usize;
let index_bytes = &self.mmap[start_pos_index..start_pos_index_size];
BlockIndex::from_bytes(index_bytes)
}
pub fn num_records(&self) -> Result<usize> {
let index = self.load_index()?;
Ok(index.num_records())
}
}
impl ParallelReader for MmapReader {
fn process_parallel<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
) -> Result<()> {
let num_records = self.num_records()?;
self.process_parallel_range(processor, num_threads, 0..num_records)
}
fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
range: Range<usize>,
) -> Result<()> {
let num_threads = if num_threads == 0 {
num_cpus::get()
} else {
num_threads.min(num_cpus::get())
};
let index = self.load_index()?;
let total_records = index.num_records();
self.validate_range(total_records, &range)?;
let relevant_blocks = index
.ranges()
.iter()
.filter(|r| {
let iv_start = r.cumulative_records as usize;
let iv_end = (r.cumulative_records + u64::from(r.block_records)) as usize;
iv_start < range.end && iv_end > range.start
})
.copied()
.collect::<Vec<_>>();
if relevant_blocks.is_empty() {
return Ok(()); }
let blocks_per_thread = relevant_blocks.len().div_ceil(num_threads);
let mmap = Arc::clone(&self.mmap);
let header = self.header;
let mut handles = Vec::new();
for thread_id in 0..num_threads {
let start_block_idx = thread_id * blocks_per_thread;
let end_block_idx =
std::cmp::min((thread_id + 1) * blocks_per_thread, relevant_blocks.len());
if start_block_idx >= relevant_blocks.len() {
continue;
}
let mmap = Arc::clone(&mmap);
let mut proc = processor.clone();
proc.set_tid(thread_id);
let thread_blocks: Vec<BlockRange> =
relevant_blocks[start_block_idx..end_block_idx].to_vec();
let handle = std::thread::spawn(move || -> Result<()> {
let mut record_block = RecordBlock::new(header.bits, header.block as usize);
for block_range in thread_blocks {
record_block.clear();
let block_start = block_range.start_offset as usize + SIZE_BLOCK_HEADER;
let block_data = &mmap[block_start..block_start + block_range.len as usize];
if header.compressed {
record_block.ingest_compressed_bytes(
block_data,
header.qual,
header.headers,
header.flags,
)?;
} else {
record_block.ingest_bytes(
block_data,
header.qual,
header.headers,
header.flags,
)?;
}
record_block.update_index(block_range.cumulative_records as usize);
if self.decode_block {
record_block.decode_all()?;
}
for record in record_block.iter() {
let global_record_idx = record.index as usize;
if global_record_idx >= range.start && global_record_idx < range.end {
proc.process_record(record)?;
}
}
proc.on_batch_complete()?;
}
Ok(())
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::BinseqRecord;
const TEST_VBQ_FILE: &str = "./data/subset.vbq";
#[test]
fn test_mmap_reader_new() {
let reader = MmapReader::new(TEST_VBQ_FILE);
assert!(reader.is_ok(), "Failed to create VBQ reader");
}
#[test]
fn test_mmap_reader_num_records() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let num_records = reader.num_records();
assert!(num_records.is_ok(), "Failed to get num_records");
assert!(num_records.unwrap() > 0, "Expected non-zero records");
}
#[test]
fn test_mmap_reader_is_paired() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let is_paired = reader.is_paired();
assert!(is_paired || !is_paired);
}
#[test]
fn test_mmap_reader_header_access() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let header = &reader.header;
assert!(header.block > 0, "Expected non-zero block size");
assert_eq!(header.magic, 0x51455356, "Expected VSEQ magic number");
}
#[test]
fn test_new_block() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let block = reader.new_block();
assert_eq!(block.bitsize, reader.header.bits);
assert!(block.n_records() == 0, "New block should be empty");
}
#[test]
fn test_record_block_creation() {
let block = RecordBlock::new(BitSize::Two, 1024);
assert_eq!(block.bitsize, BitSize::Two);
assert_eq!(block.n_records(), 0);
}
#[test]
fn test_record_block_clear() {
let mut block = RecordBlock::new(BitSize::Two, 1024);
assert_eq!(block.n_records(), 0);
block.clear();
assert_eq!(block.n_records(), 0);
}
#[test]
fn test_record_block_set_default_quality() {
let mut block = RecordBlock::new(BitSize::Two, 1024);
let custom_score = 42u8;
block.set_default_quality_score(custom_score);
assert_eq!(block.default_quality_score, custom_score);
}
#[test]
fn test_read_block_into() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
let result = reader.read_block_into(&mut block);
assert!(result.is_ok(), "Failed to read block");
if result.unwrap() {
assert!(block.n_records() > 0, "Block should contain records");
}
}
#[test]
fn test_read_multiple_blocks() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
let mut blocks_read = 0;
let max_blocks = 5;
while reader.read_block_into(&mut block).unwrap() && blocks_read < max_blocks {
assert!(block.n_records() > 0, "Each block should have records");
blocks_read += 1;
}
assert!(blocks_read > 0, "Should read at least one block");
}
#[test]
fn test_block_iteration() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
let num_records = block.n_records();
let mut count = 0;
for record in block.iter() {
assert!(record.slen() > 0, "Record should have non-zero length");
count += 1;
}
assert_eq!(count, num_records, "Iterator should yield all records");
}
}
#[test]
fn test_record_sequence_data() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
block.decode_all().unwrap();
if let Some(record) = block.iter().next() {
let sseq = record.sseq();
assert!(!sseq.is_empty(), "Sequence should not be empty");
let slen = record.slen();
assert_eq!(sseq.len(), slen as usize, "Sequence length mismatch");
}
}
}
#[test]
fn test_record_header_data() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
for record in block.iter() {
let sheader = record.sheader();
if !sheader.is_empty() {
let _ = std::str::from_utf8(sheader);
}
}
}
}
#[test]
fn test_record_quality_data() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
for record in block.iter() {
let squal = record.squal();
let slen = record.slen() as usize;
if !squal.is_empty() {
assert_eq!(
squal.len(),
slen,
"Quality length should match sequence length"
);
}
}
}
}
#[test]
fn test_record_bitsize() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
for record in block.iter() {
let bitsize = record.bitsize();
assert!(
matches!(bitsize, BitSize::Two | BitSize::Four),
"Bitsize should be Two or Four"
);
}
}
}
#[test]
fn test_set_default_quality_score() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let custom_score = 42u8;
reader.set_default_quality_score(custom_score);
assert_eq!(reader.default_quality_score, custom_score);
let block = reader.new_block();
assert_eq!(block.default_quality_score, custom_score);
}
#[test]
fn test_set_decode_block() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
reader.set_decode_block(true);
reader.set_decode_block(false);
}
#[test]
fn test_decode_block_affects_reading() {
let mut reader1 = MmapReader::new(TEST_VBQ_FILE).unwrap();
reader1.set_decode_block(true);
let mut block1 = reader1.new_block();
let mut reader2 = MmapReader::new(TEST_VBQ_FILE).unwrap();
reader2.set_decode_block(false);
let mut block2 = reader2.new_block();
let result1 = reader1.read_block_into(&mut block1);
let result2 = reader2.read_block_into(&mut block2);
assert!(result1.is_ok() && result2.is_ok());
}
#[derive(Clone, Default)]
struct VbqCountingProcessor {
count: Arc<std::sync::Mutex<usize>>,
}
impl ParallelProcessor for VbqCountingProcessor {
fn process_record<R: BinseqRecord>(&mut self, _record: R) -> Result<()> {
*self.count.lock().unwrap() += 1;
Ok(())
}
}
#[test]
fn test_parallel_processing() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let num_records_result = reader.num_records();
if num_records_result.is_err() {
return;
}
let num_records = num_records_result.unwrap();
let processor = VbqCountingProcessor::default();
let result = reader.process_parallel(processor.clone(), 2);
if result.is_ok() {
let final_count = *processor.count.lock().unwrap();
assert_eq!(final_count, num_records,);
}
}
#[test]
fn test_parallel_processing_range() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let num_records_result = reader.num_records();
if num_records_result.is_err() {
return;
}
let num_records = num_records_result.unwrap();
if num_records >= 100 {
let start = 10;
let end = 50;
let expected_count = end - start;
let processor = VbqCountingProcessor::default();
let result = reader.process_parallel_range(processor.clone(), 2, start..end);
if result.is_ok() {
let final_count = *processor.count.lock().unwrap();
assert_eq!(
final_count, expected_count,
"Processed count should match expected range"
);
}
}
}
#[test]
fn test_span_creation() {
let span = Span::new(10, 20);
assert_eq!(span.offset, 10);
assert_eq!(span.len, 20);
}
#[test]
fn test_span_default() {
let span = Span::default();
assert_eq!(span.offset, 0);
assert_eq!(span.len, 0);
}
#[test]
fn test_nonexistent_file() {
let result = MmapReader::new("./data/nonexistent.vbq");
assert!(result.is_err(), "Should fail on nonexistent file");
}
#[test]
fn test_invalid_file_format() {
let result = MmapReader::new("./Cargo.toml");
assert!(result.is_err(), "Should fail on invalid file format");
}
#[test]
fn test_load_index() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let index_result = reader.load_index();
assert!(index_result.is_ok(), "Should be able to load index");
let index = index_result.unwrap();
assert!(index.num_records() > 0, "Index should have records");
}
#[test]
fn test_index_consistency() {
let reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let num_records_from_reader = reader.num_records().unwrap();
let index = reader.load_index().unwrap();
let num_records_from_index = index.num_records();
assert_eq!(
num_records_from_reader, num_records_from_index,
"Reader and index should report same number of records"
);
}
#[test]
fn test_get_decoded_s() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
reader.set_decode_block(true);
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() && block.n_records() > 0 {
let decoded = block.get_decoded_s(0);
if let Some(seq) = decoded {
assert!(!seq.is_empty(), "Decoded sequence should not be empty");
}
}
}
#[test]
fn test_get_decoded_x() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
reader.set_decode_block(true);
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() && block.n_records() > 0 {
let decoded = block.get_decoded_x(0);
let _ = decoded;
}
}
#[test]
fn test_get_decoded_out_of_bounds() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
let num_records = block.n_records();
let decoded = block.get_decoded_s(num_records + 100);
assert!(decoded.is_none(), "Should return None for out of bounds");
}
}
#[test]
fn test_encoded_sequence_len_two_bit() {
assert_eq!(encoded_sequence_len(32, BitSize::Two), 1);
assert_eq!(encoded_sequence_len(64, BitSize::Two), 2);
assert_eq!(encoded_sequence_len(33, BitSize::Two), 2); assert_eq!(encoded_sequence_len(1, BitSize::Two), 1);
}
#[test]
fn test_encoded_sequence_len_four_bit() {
assert_eq!(encoded_sequence_len(16, BitSize::Four), 1);
assert_eq!(encoded_sequence_len(32, BitSize::Four), 2);
assert_eq!(encoded_sequence_len(17, BitSize::Four), 2); assert_eq!(encoded_sequence_len(1, BitSize::Four), 1);
}
#[test]
fn test_record_block_iter_creation() {
let block = RecordBlock::new(BitSize::Two, 1024);
let iter = RecordBlockIter::new(&block);
assert_eq!(iter.count(), 0);
}
#[test]
fn test_record_iteration_multiple_times() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() && block.n_records() > 0 {
let num_records = block.n_records();
let count1 = block.iter().count();
assert_eq!(count1, num_records);
let count2 = block.iter().count();
assert_eq!(count2, num_records);
}
}
#[test]
fn test_paired_record_data() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
if reader.is_paired() {
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
block.decode_all().unwrap();
for record in block.iter() {
let xlen = record.xlen();
if xlen > 0 {
let xseq = record.xseq();
assert_eq!(
xseq.len(),
xlen as usize,
"Extended sequence length should match xlen"
);
}
}
}
}
}
#[test]
fn test_empty_block_iteration() {
let block = RecordBlock::new(BitSize::Two, 1024);
let mut count = 0;
for _ in block.iter() {
count += 1;
}
assert_eq!(count, 0, "Empty block should yield no records");
}
#[test]
fn test_reader_reset_by_new_block() {
let mut reader = MmapReader::new(TEST_VBQ_FILE).unwrap();
let mut block = reader.new_block();
if reader.read_block_into(&mut block).unwrap() {
let first_count = block.n_records();
if reader.read_block_into(&mut block).unwrap() {
let second_count = block.n_records();
assert!(first_count > 0 && second_count > 0);
}
}
}
}