channels/
channels.rs

1use crossbeam::queue::ArrayQueue;
2use minimap2::*;
3use needletail::{FastxReader, parse_fastx_file};
4
5use std::path::PathBuf;
6use std::{error::Error, path::Path, sync::Arc, time::Duration};
7
8use clap::Parser;
9
10/// We use a worker queue to pass around work between threads.
11/// We do it this way to be generic over the type.
12enum WorkQueue<T> {
13    Work(T),
14    Result(T),
15}
16
17// Not necessary to make types, but it helps keep things straightforward
18
19// We work on distinct WorkUnits (aka a single sequence)
20type WorkUnit = (Vec<u8>, Vec<u8>); // Sequence ID, Sequence
21
22// We return the original sequence, and the vector of mappings (results)
23type WorkResult = (WorkUnit, Vec<Mapping>);
24// We could also choose to return just the Seq ID, and the Mappings should also have the query name too
25// You can return whatever would be most useful
26
27#[derive(Parser, Debug)]
28#[command(
29    name = "minimap2-channels-example",
30    about = "An example of how to use the minimap2 crate with multithreading"
31)]
32struct Cli {
33    /// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format)
34    pub target: PathBuf,
35
36    /// The query file to align (e.g. reads - can be FASTA or FASTQ)
37    pub query: PathBuf,
38
39    /// The number of threads to use
40    pub threads: usize,
41}
42
43fn main() {
44    // Parse command line arguments
45    let args = Cli::parse();
46
47    map(args.target, args.query, args.threads).expect("Unable to map");
48}
49
50fn map(
51    target_file: impl AsRef<Path>,
52    query_file: impl AsRef<Path>,
53    threads: usize,
54) -> Result<(), Box<dyn Error>> {
55    // Aligner gets created using the build pattern.
56    // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
57    println!("Creating index");
58    let aligner = Aligner::builder()
59        .map_ont()
60        .with_cigar()
61        .with_index_threads(threads)
62        .with_index(target_file, None)
63        .expect("Unable to build index");
64
65    println!("Index created");
66
67    // Create a queue for work and for results
68    let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69    let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71    // I use a shutdown flag
72    let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74    // Store join handles, it's just good practice to clean up threads
75    let mut jh = Vec::new();
76
77    let aligner = Arc::new(aligner);
78
79    // Spin up the threads
80    for _ in 0..threads {
81        // Clone everything we will need...
82        let work_queue = Arc::clone(&work_queue);
83        let results_queue = Arc::clone(&results_queue);
84        let shutdown = Arc::clone(&shutdown);
85        let aligner = Arc::clone(&aligner);
86
87        let handle =
88            std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90        jh.push(handle);
91    }
92
93    // Let's split this into another thread
94
95    {
96        let work_queue = Arc::clone(&work_queue);
97        let shutdown = Arc::clone(&shutdown);
98        let query_file = query_file.as_ref().to_path_buf();
99
100        let handle = std::thread::spawn(move || {
101            // Now that the threads are running, read the input file and push the work to the queue
102            let mut reader: Box<dyn FastxReader> = parse_fastx_file(query_file)
103                .unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105            // I just do this in the main thread, but you can split threads
106            let backoff = crossbeam::utils::Backoff::new();
107            while let Some(Ok(record)) = reader.next() {
108                let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109                while let Err(work_packet) = work_queue.push(work) {
110                    work = work_packet; // Simple way to maintain ownership
111                    // If we have an error, it's 99% because the queue is full
112                    backoff.snooze();
113                }
114            }
115
116            // Set the shutdown flag
117            shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118        });
119
120        jh.push(handle);
121    }
122
123    let mut num_alignments = 0;
124
125    let backoff = crossbeam::utils::Backoff::new();
126    loop {
127        match results_queue.pop() {
128            // This is where we processs mapping results as they come in...
129            Some(WorkQueue::Result((record, alignments))) => {
130                num_alignments += alignments.len();
131            }
132            Some(_) => unimplemented!("Unexpected result type"),
133            None => {
134                backoff.snooze();
135
136                // If all join handles are finished, we can break
137                if jh.iter().all(|h| h.is_finished()) {
138                    break;
139                }
140                if backoff.is_completed() {
141                    backoff.reset();
142                    std::thread::sleep(Duration::from_millis(3));
143                }
144            }
145        }
146    }
147
148    // Join all the threads
149    for handle in jh {
150        handle.join().expect("Unable to join thread");
151    }
152
153    println!("Total alignments: {}", num_alignments);
154
155    Ok(())
156}
157
158// Convert this to a function
159fn worker(
160    work_queue: Arc<ArrayQueue<WorkQueue<WorkUnit>>>,
161    results_queue: Arc<ArrayQueue<WorkQueue<WorkResult>>>,
162    shutdown: Arc<std::sync::atomic::AtomicBool>,
163    aligner: Arc<Aligner<Built>>,
164) {
165    loop {
166        // We use backoff to sleep when we don't have any work
167        let backoff = crossbeam::utils::Backoff::new();
168
169        match work_queue.pop() {
170            Some(WorkQueue::Work(sequence)) => {
171                let alignment = aligner
172                    .map(&sequence.1, true, false, None, None, Some(&sequence.0))
173                    .expect("Unable to align");
174
175                // Return the original sequence, as well as the mappings
176                // We have to do it this way because ownership
177                let mut result = WorkQueue::Result((sequence, alignment));
178                while let Err(result_packet) = results_queue.push(result) {
179                    result = result_packet; // Simple way to maintain ownership
180                    // If we have an error, it's 99% because the queue is full
181                    backoff.snooze();
182                }
183            }
184            Some(_) => unimplemented!("Unexpected work type"),
185            None => {
186                backoff.snooze();
187
188                // If we have the shutdown signal, we should exit
189                if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() {
190                    break;
191                }
192            }
193        }
194    }
195}