use std::{
fs::File,
io::BufWriter,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use anyhow::{bail, Result};
use binseq::{
bq::{self, BinseqHeaderBuilder},
BinseqReader, BinseqRecord, Context, ParallelProcessor, ParallelReader,
};
use nucgen::Sequence;
#[derive(Clone, Default)]
pub struct MyProcessor {
local_counter: usize,
counter: Arc<AtomicUsize>,
ctx: Context,
}
impl MyProcessor {
#[must_use]
pub fn counter(&self) -> usize {
self.counter.load(Ordering::Relaxed)
}
}
impl ParallelProcessor for MyProcessor {
fn process_record<R: BinseqRecord>(&mut self, record: R) -> binseq::Result<()> {
self.ctx.fill_sequences(&record)?;
self.local_counter += 1;
Ok(())
}
fn on_batch_complete(&mut self) -> binseq::Result<()> {
self.counter
.fetch_add(self.local_counter, Ordering::Relaxed);
self.local_counter = 0;
Ok(())
}
}
fn mmap_processing(binseq_path: &str, n_threads: usize) -> Result<()> {
let reader = BinseqReader::new(binseq_path)?;
let proc = MyProcessor::default();
reader.process_parallel(proc.clone(), n_threads)?;
Ok(())
}
pub fn main() -> Result<()> {
let binseq_path_single = "./data/test.bq";
let binseq_path_paired = "./data/test_paired.bq";
let r1_size = 150;
let r2_size = 300;
let num_seq = 1_000_000;
time_it(
|| {
write_single(binseq_path_single, num_seq, r1_size)?;
Ok(())
},
"write_single",
);
time_it(
|| {
write_paired(binseq_path_paired, num_seq, r1_size, r2_size)?;
Ok(())
},
"write_paired",
);
for n_threads in 1..=16 {
if n_threads > 1 && n_threads % 2 != 0 {
continue;
}
time_it(
|| {
mmap_processing(binseq_path_single, n_threads)?;
Ok(())
},
&format!("single - mmap_parallel_processing ({n_threads})"),
);
}
for n_threads in 1..=16 {
if n_threads > 1 && n_threads % 2 != 0 {
continue;
}
time_it(
|| {
mmap_processing(binseq_path_paired, n_threads)?;
Ok(())
},
&format!("paired - mmap_parallel_processing ({n_threads})"),
);
}
Ok(())
}
fn time_it<F>(f: F, name: &str)
where
F: Fn() -> Result<()>,
{
let now = std::time::Instant::now();
f().unwrap();
let elapsed = now.elapsed();
eprintln!("Elapsed time ({name}): {elapsed:?}");
}
fn write_single(binseq_path: &str, num_seq: usize, seq_size: usize) -> Result<()> {
let header = BinseqHeaderBuilder::new().slen(seq_size as u32).build()?;
let out_handle = File::create(binseq_path).map(BufWriter::new)?;
let mut writer = bq::BinseqWriterBuilder::default()
.header(header)
.build(out_handle)?;
let mut sequence = Sequence::new();
let mut rng = rand::rng();
for _ in 0..num_seq {
sequence.fill_buffer(&mut rng, seq_size);
if !writer.write_record(Some(0), sequence.bytes())? {
bail!("Error writing nucleotides")
}
}
writer.flush()?;
eprintln!("Finished writing {num_seq} records to path: {binseq_path}");
Ok(())
}
fn write_paired(binseq_path: &str, num_seq: usize, r1_size: usize, r2_size: usize) -> Result<()> {
let header = bq::BinseqHeaderBuilder::new()
.slen(r1_size as u32)
.xlen(r2_size as u32)
.build()?;
let out_handle = File::create(binseq_path).map(BufWriter::new)?;
let mut writer = bq::BinseqWriterBuilder::default()
.header(header)
.build(out_handle)?;
let mut r1 = Sequence::new();
let mut r2 = Sequence::new();
let mut rng = rand::rng();
for _ in 0..num_seq {
r1.fill_buffer(&mut rng, r1_size);
r2.fill_buffer(&mut rng, r2_size);
if !writer.write_paired_record(Some(0), r1.bytes(), r2.bytes())? {
bail!("Error writing nucleotides")
}
}
writer.flush()?;
eprintln!("Finished writing {num_seq} records to path: {binseq_path}");
Ok(())
}