1use crossbeam::queue::ArrayQueue;
2use minimap2::*;
3use needletail::{FastxReader, parse_fastx_file};
4
5use std::path::PathBuf;
6use std::{error::Error, path::Path, sync::Arc, time::Duration};
7
8use clap::Parser;
9
10enum WorkQueue<T> {
13 Work(T),
14 Result(T),
15}
16
17type WorkUnit = (Vec<u8>, Vec<u8>); type WorkResult = (WorkUnit, Vec<Mapping>);
24#[derive(Parser, Debug)]
28#[command(
29 name = "minimap2-channels-example",
30 about = "An example of how to use the minimap2 crate with multithreading"
31)]
32struct Cli {
33 pub target: PathBuf,
35
36 pub query: PathBuf,
38
39 pub threads: usize,
41}
42
43fn main() {
44 let args = Cli::parse();
46
47 map(args.target, args.query, args.threads).expect("Unable to map");
48}
49
50fn map(
51 target_file: impl AsRef<Path>,
52 query_file: impl AsRef<Path>,
53 threads: usize,
54) -> Result<(), Box<dyn Error>> {
55 println!("Creating index");
58 let aligner = Aligner::builder()
59 .map_ont()
60 .with_cigar()
61 .with_index_threads(threads)
62 .with_index(target_file, None)
63 .expect("Unable to build index");
64
65 println!("Index created");
66
67 let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69 let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74 let mut jh = Vec::new();
76
77 let aligner = Arc::new(aligner);
78
79 for _ in 0..threads {
81 let work_queue = Arc::clone(&work_queue);
83 let results_queue = Arc::clone(&results_queue);
84 let shutdown = Arc::clone(&shutdown);
85 let aligner = Arc::clone(&aligner);
86
87 let handle =
88 std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90 jh.push(handle);
91 }
92
93 {
96 let work_queue = Arc::clone(&work_queue);
97 let shutdown = Arc::clone(&shutdown);
98 let query_file = query_file.as_ref().to_path_buf();
99
100 let handle = std::thread::spawn(move || {
101 let mut reader: Box<dyn FastxReader> = parse_fastx_file(query_file)
103 .unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105 let backoff = crossbeam::utils::Backoff::new();
107 while let Some(Ok(record)) = reader.next() {
108 let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109 while let Err(work_packet) = work_queue.push(work) {
110 work = work_packet; backoff.snooze();
113 }
114 }
115
116 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118 });
119
120 jh.push(handle);
121 }
122
123 let mut num_alignments = 0;
124
125 let backoff = crossbeam::utils::Backoff::new();
126 loop {
127 match results_queue.pop() {
128 Some(WorkQueue::Result((record, alignments))) => {
130 num_alignments += alignments.len();
131 }
132 Some(_) => unimplemented!("Unexpected result type"),
133 None => {
134 backoff.snooze();
135
136 if jh.iter().all(|h| h.is_finished()) {
138 break;
139 }
140 if backoff.is_completed() {
141 backoff.reset();
142 std::thread::sleep(Duration::from_millis(3));
143 }
144 }
145 }
146 }
147
148 for handle in jh {
150 handle.join().expect("Unable to join thread");
151 }
152
153 println!("Total alignments: {}", num_alignments);
154
155 Ok(())
156}
157
158fn worker(
160 work_queue: Arc<ArrayQueue<WorkQueue<WorkUnit>>>,
161 results_queue: Arc<ArrayQueue<WorkQueue<WorkResult>>>,
162 shutdown: Arc<std::sync::atomic::AtomicBool>,
163 aligner: Arc<Aligner<Built>>,
164) {
165 loop {
166 let backoff = crossbeam::utils::Backoff::new();
168
169 match work_queue.pop() {
170 Some(WorkQueue::Work(sequence)) => {
171 let alignment = aligner
172 .map(&sequence.1, true, false, None, None, Some(&sequence.0))
173 .expect("Unable to align");
174
175 let mut result = WorkQueue::Result((sequence, alignment));
178 while let Err(result_packet) = results_queue.push(result) {
179 result = result_packet; backoff.snooze();
182 }
183 }
184 Some(_) => unimplemented!("Unexpected work type"),
185 None => {
186 backoff.snooze();
187
188 if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() {
190 break;
191 }
192 }
193 }
194 }
195}