use binseq::{BinseqReader, BinseqRecord, ParallelProcessor, ParallelReader, Result};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Clone)]
struct RangeProcessor {
counter: Arc<AtomicUsize>,
tid: Option<usize>,
range_start: usize,
range_end: usize,
}
impl RangeProcessor {
fn new(range_start: usize, range_end: usize) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(0)),
tid: None,
range_start,
range_end,
}
}
fn count(&self) -> usize {
self.counter.load(Ordering::Relaxed)
}
}
impl ParallelProcessor for RangeProcessor {
fn process_record<R: BinseqRecord>(&mut self, record: R) -> Result<()> {
let count = self.counter.fetch_add(1, Ordering::Relaxed);
if count % 10_000 == 0 {
if let Some(tid) = self.tid {
println!(
"Thread {}: Processed {} records (Range: {}-{}, Index: {}, Len: {})",
tid,
count + 1,
self.range_start,
self.range_end,
record.index(),
record.sseq().len(),
);
}
}
Ok(())
}
fn set_tid(&mut self, tid: usize) {
self.tid = Some(tid);
}
fn get_tid(&self) -> Option<usize> {
self.tid
}
fn on_batch_complete(&mut self) -> Result<()> {
if let Some(tid) = self.tid {
println!("Thread {tid} completed batch processing");
}
Ok(())
}
}
fn main() -> Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!(
"Usage: {} <binseq_file> [num_threads] [start] [end]",
args[0]
);
eprintln!("Example: {} data/subset.bq 4 1000 5000", args[0]);
std::process::exit(1);
}
let file_path = &args[1];
let num_threads = args
.get(2)
.unwrap_or(&"4".to_string())
.parse::<usize>()
.map_err(|e| binseq::Error::from(anyhow::Error::from(e)))?;
let reader = BinseqReader::new(file_path)?;
let total_records = reader.num_records()?;
println!("File: {file_path}");
println!("Total records in file: {total_records}");
let start = args
.get(3)
.map(|s| s.parse::<usize>())
.transpose()
.map_err(|e| binseq::Error::from(anyhow::Error::from(e)))?
.unwrap_or(0);
let end = args
.get(4)
.map(|s| s.parse::<usize>())
.transpose()
.map_err(|e| binseq::Error::from(anyhow::Error::from(e)))?
.unwrap_or(total_records.min(10_000));
if start >= total_records {
eprintln!("Error: Start index {start} is >= total records {total_records}");
std::process::exit(1);
}
if end > total_records {
eprintln!(
"Warning: End index {end} is > total records {total_records}, clamping to {total_records}"
);
}
let end = end.min(total_records);
if start >= end {
eprintln!("Error: Start index {start} must be < end index {end}");
std::process::exit(1);
}
println!(
"Processing range: {} to {} ({} records)",
start,
end,
end - start
);
println!("Using {num_threads} threads");
println!();
println!("=== Processing full file ===");
let reader_full = BinseqReader::new(file_path)?;
let processor_full = RangeProcessor::new(0, total_records);
let start_time = std::time::Instant::now();
reader_full.process_parallel(processor_full.clone(), num_threads)?;
let elapsed_full = start_time.elapsed();
println!("Full file processing completed!");
println!("Records processed: {}", processor_full.count());
println!("Time taken: {elapsed_full:.2?}");
println!();
println!("=== Processing specific range ===");
let reader_range = BinseqReader::new(file_path)?;
let processor_range = RangeProcessor::new(start, end);
let start_time = std::time::Instant::now();
reader_range.process_parallel_range(processor_range.clone(), num_threads, start..end)?;
let elapsed_range = start_time.elapsed();
println!("Range processing completed!");
println!("Records processed: {}", processor_range.count());
println!("Expected records: {}", end - start);
println!("Time taken: {elapsed_range:.2?}");
if processor_range.count() > 0 && processor_full.count() > 0 {
let full_rate = processor_full.count() as f64 / elapsed_full.as_secs_f64();
let range_rate = processor_range.count() as f64 / elapsed_range.as_secs_f64();
println!();
println!("=== Performance Comparison ===");
println!("Full file rate: {full_rate:.0} records/sec");
println!("Range rate: {range_rate:.0} records/sec");
if range_rate > full_rate {
println!(
"Range processing was {:.1}x faster per record",
range_rate / full_rate
);
} else {
println!(
"Full file processing was {:.1}x faster per record",
full_rate / range_rate
);
}
}
Ok(())
}