use std::ops::Range;
use std::path::Path;
use crate::{
BinseqRecord, Result, bq, cbq,
error::{ExtensionError, ReadError},
vbq,
};
pub enum BinseqReader {
Bq(bq::MmapReader),
Vbq(vbq::MmapReader),
Cbq(cbq::MmapReader),
}
impl BinseqReader {
pub fn new(path: &str) -> Result<Self> {
let pathbuf = Path::new(path);
match pathbuf.extension() {
Some(ext) => match ext.to_str() {
Some("bq") => Ok(Self::Bq(bq::MmapReader::new(path)?)),
Some("vbq") => Ok(Self::Vbq(vbq::MmapReader::new(path)?)),
Some("cbq") => Ok(Self::Cbq(cbq::MmapReader::new(path)?)),
_ => Err(ExtensionError::UnsupportedExtension(path.to_string()).into()),
},
None => Err(ExtensionError::UnsupportedExtension(path.to_string()).into()),
}
}
pub fn set_decode_block(&mut self, decode_block: bool) {
match self {
Self::Bq(_) | Self::Cbq(_) => {
}
Self::Vbq(reader) => reader.set_decode_block(decode_block),
}
}
pub fn set_default_quality_score(&mut self, score: u8) {
match self {
Self::Bq(reader) => reader.set_default_quality_score(score),
Self::Vbq(reader) => reader.set_default_quality_score(score),
Self::Cbq(reader) => reader.set_default_quality_score(score),
}
}
#[must_use]
pub fn is_paired(&self) -> bool {
match self {
Self::Bq(reader) => reader.is_paired(),
Self::Vbq(reader) => reader.is_paired(),
Self::Cbq(reader) => reader.is_paired(),
}
}
pub fn num_records(&self) -> Result<usize> {
match self {
Self::Bq(reader) => Ok(reader.num_records()),
Self::Vbq(reader) => reader.num_records(),
Self::Cbq(reader) => Ok(reader.num_records()),
}
}
pub fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
range: Range<usize>,
) -> Result<()> {
match self {
Self::Bq(reader) => reader.process_parallel_range(processor, num_threads, range),
Self::Vbq(reader) => reader.process_parallel_range(processor, num_threads, range),
Self::Cbq(reader) => reader.process_parallel_range(processor, num_threads, range),
}
}
}
impl ParallelReader for BinseqReader {
fn process_parallel<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
) -> Result<()> {
let num_records = self.num_records()?;
self.process_parallel_range(processor, num_threads, 0..num_records)
}
fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
range: Range<usize>,
) -> Result<()> {
match self {
Self::Bq(reader) => reader.process_parallel_range(processor, num_threads, range),
Self::Vbq(reader) => reader.process_parallel_range(processor, num_threads, range),
Self::Cbq(reader) => reader.process_parallel_range(processor, num_threads, range),
}
}
}
pub trait ParallelReader {
fn process_parallel<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
) -> Result<()>;
fn process_parallel_range<P: ParallelProcessor + Clone + 'static>(
self,
processor: P,
num_threads: usize,
range: Range<usize>,
) -> Result<()>;
fn validate_range(&self, total_records: usize, range: &Range<usize>) -> Result<()> {
if range.start >= total_records {
Err(ReadError::OutOfRange {
requested_index: range.start,
max_index: total_records,
}
.into())
} else if range.end > total_records {
Err(ReadError::OutOfRange {
requested_index: range.end,
max_index: total_records,
}
.into())
} else if range.start > range.end {
Err(ReadError::InvalidRange {
start: range.start,
end: range.end,
}
.into())
} else {
Ok(())
}
}
}
pub trait ParallelProcessor: Send + Clone {
fn process_record<R: BinseqRecord>(&mut self, record: R) -> Result<()>;
#[allow(unused_variables)]
fn on_batch_complete(&mut self) -> Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn set_tid(&mut self, _tid: usize) {
}
fn get_tid(&self) -> Option<usize> {
None
}
}
#[cfg(test)]
mod testing {
use std::sync::Arc;
use parking_lot::Mutex;
use super::*;
#[derive(Clone, Default)]
struct TestProcessor {
pub n_records: Arc<Mutex<usize>>,
}
impl ParallelProcessor for TestProcessor {
fn process_record<R: BinseqRecord>(&mut self, _record: R) -> Result<()> {
*self.n_records.lock() += 1;
Ok(())
}
}
#[test]
fn test_parallel_processor() {
for ext in ["bq", "vbq", "cbq"] {
eprintln!("Testing {}", ext);
let reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
let num_records = reader.num_records().unwrap();
let processor = TestProcessor::default();
assert!(reader.process_parallel(processor.clone(), 0).is_ok());
assert_eq!(*processor.n_records.lock(), num_records);
}
}
#[test]
fn test_parallel_processor_range() {
for ext in ["bq", "vbq", "cbq"] {
eprintln!("Testing {}", ext);
let reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
let processor = TestProcessor::default();
assert!(
reader
.process_parallel_range(processor.clone(), 0, 0..10)
.is_ok()
);
assert_eq!(*processor.n_records.lock(), 10);
}
}
#[test]
fn test_parallel_processor_out_of_range_start() {
for ext in ["bq", "vbq", "cbq"] {
eprintln!("Testing {}", ext);
let reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
let processor = TestProcessor::default();
assert!(
reader
.process_parallel_range(processor, 0, 1_000_000..1_000_001)
.is_err()
);
}
}
#[test]
fn test_parallel_processor_out_of_range_end() {
for ext in ["bq", "vbq", "cbq"] {
eprintln!("Testing {}", ext);
let reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
let processor = TestProcessor::default();
assert!(
reader
.process_parallel_range(processor, 0, 0..1_000_000)
.is_err()
);
}
}
#[test]
fn test_parallel_processor_backwards_range() {
for ext in ["bq", "vbq", "cbq"] {
eprintln!("Testing {}", ext);
let reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
let processor = TestProcessor::default();
assert!(reader.process_parallel_range(processor, 0, 100..0).is_err());
}
}
#[test]
fn test_set_decode_block() {
for ext in ["bq", "vbq", "cbq"] {
for opt in [true, false] {
eprintln!("Testing {} - decode {}", ext, opt);
let mut reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
reader.set_decode_block(opt);
let num_records = reader.num_records().unwrap();
let processor = TestProcessor::default();
assert!(reader.process_parallel(processor.clone(), 0).is_ok());
assert_eq!(*processor.n_records.lock(), num_records);
}
}
}
#[test]
fn test_set_default_quality_score() {
for ext in ["bq", "vbq", "cbq"] {
let default_score = b'#';
eprintln!("Testing {} - default score: {}", ext, default_score);
let mut reader = BinseqReader::new(&format!("./data/subset.{}", ext)).unwrap();
reader.set_default_quality_score(default_score);
let num_records = reader.num_records().unwrap();
let processor = TestProcessor::default();
assert!(reader.process_parallel(processor.clone(), 0).is_ok());
assert_eq!(*processor.n_records.lock(), num_records);
}
}
}