use std::{
fs::File,
io::{stdout, BufWriter, Write},
sync::Arc,
time::Instant,
};
use anyhow::Result;
use parking_lot::Mutex;
use vbinseq::{MmapReader, ParallelProcessor, RefRecord};
#[derive(Clone)]
pub struct Decoder {
buffer: Vec<u8>,
dbuf: Vec<u8>,
local_records: usize,
quality: Vec<u8>,
global_buffer: Arc<Mutex<Box<dyn Write + Send>>>,
num_records: Arc<Mutex<usize>>,
}
impl Decoder {
pub fn new(writer: Box<dyn Write + Send>) -> Self {
let global_buffer = Arc::new(Mutex::new(writer));
Decoder {
buffer: Vec::new(),
dbuf: Vec::new(),
local_records: 0,
quality: Vec::new(),
global_buffer,
num_records: Arc::new(Mutex::new(0)),
}
}
pub fn num_records(&self) -> usize {
*self.num_records.lock()
}
}
impl ParallelProcessor for Decoder {
fn process_record(&mut self, record: RefRecord) -> vbinseq::Result<()> {
self.dbuf.clear();
record.decode_s(&mut self.dbuf)?;
let qual_buf = if record.squal().is_empty() {
if self.quality.len() < record.slen() as usize {
self.quality.resize(record.slen() as usize, b'?');
}
&self.quality[0..record.slen() as usize]
} else {
record.squal()
};
write_fastq(&mut self.buffer, record.index(), &self.dbuf, qual_buf)?;
self.local_records += 1;
Ok(())
}
fn on_batch_complete(&mut self) -> vbinseq::Result<()> {
{
let mut lock = self.global_buffer.lock();
lock.write_all(&self.buffer)?;
lock.flush()?;
}
{
let mut num_records = self.num_records.lock();
*num_records += self.local_records;
}
self.buffer.clear();
self.local_records = 0;
Ok(())
}
}
fn write_fastq<W: Write>(
buffer: &mut W,
index: u64,
sequence: &[u8],
quality: &[u8],
) -> Result<(), std::io::Error> {
writeln!(buffer, "@seq.{index}")?;
buffer.write_all(sequence)?;
buffer.write_all(b"\n+\n")?;
buffer.write_all(quality)?;
buffer.write_all(b"\n")?;
Ok(())
}
fn match_output(path: Option<&str>) -> Result<Box<dyn Write + Send>> {
match path {
Some(path) => {
let writer = File::create(path).map(BufWriter::new)?;
Ok(Box::new(writer))
}
None => {
let stdout = stdout();
Ok(Box::new(BufWriter::new(stdout)))
}
}
}
fn main() -> Result<()> {
let test_file = std::env::args()
.nth(1)
.unwrap_or("./data/out.vbq".to_string());
let n_threads = std::env::args()
.nth(2)
.unwrap_or("8".to_string())
.parse::<usize>()?;
let writer = match_output(None)?;
let start = Instant::now();
let reader = MmapReader::new(&test_file)?;
let processor = Decoder::new(writer);
reader.process_parallel(processor.clone(), n_threads)?;
let duration = start.elapsed();
let n_records = processor.num_records();
eprintln!("Time: {:?}", duration);
eprintln!("Records: {}", n_records);
eprintln!(
"Throughput: {:.2}M records/s",
n_records as f64 / duration.as_millis() as f64 * 1000.0 / 1_000_000.0
);
Ok(())
}