use crossbeam::queue::ArrayQueue;
use minimap2::*;
use needletail::{FastxReader, parse_fastx_file};
use std::path::PathBuf;
use std::{error::Error, path::Path, sync::Arc, time::Duration};
use clap::Parser;
enum WorkQueue<T> {
Work(T),
Result(T),
}
type WorkUnit = (Vec<u8>, Vec<u8>);
type WorkResult = (WorkUnit, Vec<Mapping>);
#[derive(Parser, Debug)]
#[command(
name = "minimap2-channels-example",
about = "An example of how to use the minimap2 crate with multithreading"
)]
struct Cli {
pub target: PathBuf,
pub query: PathBuf,
pub threads: usize,
}
fn main() {
let args = Cli::parse();
map(args.target, args.query, args.threads).expect("Unable to map");
}
fn map(
target_file: impl AsRef<Path>,
query_file: impl AsRef<Path>,
threads: usize,
) -> Result<(), Box<dyn Error>> {
println!("Creating index");
let aligner = Aligner::builder()
.map_ont()
.with_cigar()
.with_index_threads(threads)
.with_index(target_file, None)
.expect("Unable to build index");
println!("Index created");
let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut jh = Vec::new();
let aligner = Arc::new(aligner);
for _ in 0..threads {
let work_queue = Arc::clone(&work_queue);
let results_queue = Arc::clone(&results_queue);
let shutdown = Arc::clone(&shutdown);
let aligner = Arc::clone(&aligner);
let handle =
std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
jh.push(handle);
}
{
let work_queue = Arc::clone(&work_queue);
let shutdown = Arc::clone(&shutdown);
let query_file = query_file.as_ref().to_path_buf();
let handle = std::thread::spawn(move || {
let mut reader: Box<dyn FastxReader> = parse_fastx_file(query_file)
.unwrap_or_else(|_| panic!("Can't find query FASTA file"));
let backoff = crossbeam::utils::Backoff::new();
while let Some(Ok(record)) = reader.next() {
let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
while let Err(work_packet) = work_queue.push(work) {
work = work_packet; backoff.snooze();
}
}
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
});
jh.push(handle);
}
let mut num_alignments = 0;
let backoff = crossbeam::utils::Backoff::new();
loop {
match results_queue.pop() {
Some(WorkQueue::Result((record, alignments))) => {
num_alignments += alignments.len();
}
Some(_) => unimplemented!("Unexpected result type"),
None => {
backoff.snooze();
if jh.iter().all(|h| h.is_finished()) {
break;
}
if backoff.is_completed() {
backoff.reset();
std::thread::sleep(Duration::from_millis(3));
}
}
}
}
for handle in jh {
handle.join().expect("Unable to join thread");
}
println!("Total alignments: {}", num_alignments);
Ok(())
}
fn worker(
work_queue: Arc<ArrayQueue<WorkQueue<WorkUnit>>>,
results_queue: Arc<ArrayQueue<WorkQueue<WorkResult>>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
aligner: Arc<Aligner<Built>>,
) {
loop {
let backoff = crossbeam::utils::Backoff::new();
match work_queue.pop() {
Some(WorkQueue::Work(sequence)) => {
let alignment = aligner
.map(&sequence.1, true, false, None, None, Some(&sequence.0))
.expect("Unable to align");
let mut result = WorkQueue::Result((sequence, alignment));
while let Err(result_packet) = results_queue.push(result) {
result = result_packet; backoff.snooze();
}
}
Some(_) => unimplemented!("Unexpected work type"),
None => {
backoff.snooze();
if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() {
break;
}
}
}
}
}