pub struct Aligner<S: BuilderState> {
pub idxopt: IdxOpt,
pub mapopt: MapOpt,
pub threads: usize,
pub idx: Option<Arc<MmIdx>>,
pub idx_reader: Option<Arc<mm_idx_reader_t>>,
pub cigar_clipping: bool,
/* private fields */
}
Expand description
Aligner struct, mimicking minimap2’s python interface
Aligner::builder();
Fields§
§idxopt: IdxOpt
Index options passed to minimap2 (mm_idxopt_t)
mapopt: MapOpt
Mapping options passed to minimap2 (mm_mapopt_t)
threads: usize
Number of threads to create the index with
idx: Option<Arc<MmIdx>>
Index created by minimap2
idx_reader: Option<Arc<mm_idx_reader_t>>
Index reader created by minimap2
cigar_clipping: bool
Whether to add soft clipping to CIGAR result
Implementations§
Source§impl Aligner<()>
impl Aligner<()>
Sourcepub fn builder() -> Aligner<Unset>
pub fn builder() -> Aligner<Unset>
Create a new aligner with default options
Examples found in repository?
32fn map(
33 target_file: impl AsRef<Path>,
34 query_file: impl AsRef<Path>,
35 threads: usize,
36) -> Result<(), Box<dyn Error>> {
37 // Set the number of threads to use
38 rayon::ThreadPoolBuilder::new()
39 .num_threads(threads)
40 .build_global()
41 .expect("Unable to set number of threads");
42
43 println!("Creating index");
44
45 // Aligner gets created using the build pattern.
46 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
47 let aligner = Aligner::builder()
48 .map_ont()
49 .with_cigar()
50 .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
51 .with_index(target_file, None)
52 .expect("Unable to build index");
53
54 println!("Index created");
55
56 // Read in the query file
57 let mut reader = parse_fastx_file(query_file)?;
58
59 let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
60 while let Some(record) = reader.next() {
61 let record = record.expect("Error reading record");
62 queries.push((record.id().to_vec(), record.seq().to_vec()));
63 }
64
65 // Map the queries
66 let results: Vec<Vec<Mapping>> = queries
67 .par_iter()
68 .map(|(id, seq)| {
69 aligner
70 .map(&seq, false, false, None, None, Some(&id))
71 .expect("Error mapping")
72 })
73 .collect();
74
75 // Count total number of alignments
76 let total_alignments: usize = results.iter().map(|x| x.len()).sum();
77 println!("Iteration complete, total alignments {}", total_alignments);
78
79 Ok(())
80}
More examples
50fn map(
51 target_file: impl AsRef<Path>,
52 query_file: impl AsRef<Path>,
53 threads: usize,
54) -> Result<(), Box<dyn Error>> {
55 // Aligner gets created using the build pattern.
56 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
57 println!("Creating index");
58 let aligner = Aligner::builder()
59 .map_ont()
60 .with_cigar()
61 .with_index_threads(threads)
62 .with_index(target_file, None)
63 .expect("Unable to build index");
64
65 println!("Index created");
66
67 // Create a queue for work and for results
68 let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69 let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71 // I use a shutdown flag
72 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74 // Store join handles, it's just good practice to clean up threads
75 let mut jh = Vec::new();
76
77 let aligner = Arc::new(aligner);
78
79 // Spin up the threads
80 for _ in 0..threads {
81 // Clone everything we will need...
82 let work_queue = Arc::clone(&work_queue);
83 let results_queue = Arc::clone(&results_queue);
84 let shutdown = Arc::clone(&shutdown);
85 let aligner = Arc::clone(&aligner);
86
87 let handle =
88 std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90 jh.push(handle);
91 }
92
93 // Let's split this into another thread
94
95 {
96 let work_queue = Arc::clone(&work_queue);
97 let shutdown = Arc::clone(&shutdown);
98 let query_file = query_file.as_ref().to_path_buf();
99
100 let handle = std::thread::spawn(move || {
101 // Now that the threads are running, read the input file and push the work to the queue
102 let mut reader: Box<dyn FastxReader> =
103 parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105 // I just do this in the main thread, but you can split threads
106 let backoff = crossbeam::utils::Backoff::new();
107 while let Some(Ok(record)) = reader.next() {
108 let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109 while let Err(work_packet) = work_queue.push(work) {
110 work = work_packet; // Simple way to maintain ownership
111 // If we have an error, it's 99% because the queue is full
112 backoff.snooze();
113 }
114 }
115
116 // Set the shutdown flag
117 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118 });
119
120 jh.push(handle);
121 }
122
123 let mut num_alignments = 0;
124
125 let backoff = crossbeam::utils::Backoff::new();
126 loop {
127 match results_queue.pop() {
128 // This is where we processs mapping results as they come in...
129 Some(WorkQueue::Result((record, alignments))) => {
130 num_alignments += alignments.len();
131
132 }
133 Some(_) => unimplemented!("Unexpected result type"),
134 None => {
135 backoff.snooze();
136
137 // If all join handles are finished, we can break
138 if jh.iter().all(|h| h.is_finished()) {
139 break;
140 }
141 if backoff.is_completed() {
142 backoff.reset();
143 std::thread::sleep(Duration::from_millis(3));
144 }
145 }
146 }
147 }
148
149 // Join all the threads
150 for handle in jh {
151 handle.join().expect("Unable to join thread");
152 }
153
154 println!("Total alignments: {}", num_alignments);
155
156 Ok(())
157}
Source§impl Aligner<Unset>
impl Aligner<Unset>
Sourcepub fn lrhq(self) -> Aligner<PresetSet>
pub fn lrhq(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to lr:hq.
Presets should be called before any other options are set, as they change multiple options at once.
Sourcepub fn splice(self) -> Aligner<PresetSet>
pub fn splice(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to splice
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().splice();
Sourcepub fn splice_hq(self) -> Aligner<PresetSet>
pub fn splice_hq(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to splice:hq
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().splice_hq();
Sourcepub fn asm(self) -> Aligner<PresetSet>
pub fn asm(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to Asm
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().asm();
Sourcepub fn asm5(self) -> Aligner<PresetSet>
pub fn asm5(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to Asm5
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().asm5();
Sourcepub fn asm10(self) -> Aligner<PresetSet>
pub fn asm10(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to Asm10
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().asm10();
Sourcepub fn asm20(self) -> Aligner<PresetSet>
pub fn asm20(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to Asm20
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().asm20();
Sourcepub fn sr(self) -> Aligner<PresetSet>
pub fn sr(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to sr
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().sr();
Sourcepub fn map_pb(self) -> Aligner<PresetSet>
pub fn map_pb(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to MapPb
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().map_pb();
Sourcepub fn map_hifi(self) -> Aligner<PresetSet>
pub fn map_hifi(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to MapHifi
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().map_hifi();
Sourcepub fn map_ont(self) -> Aligner<PresetSet>
pub fn map_ont(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to MapOnt.
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().map_ont();
Examples found in repository?
32fn map(
33 target_file: impl AsRef<Path>,
34 query_file: impl AsRef<Path>,
35 threads: usize,
36) -> Result<(), Box<dyn Error>> {
37 // Set the number of threads to use
38 rayon::ThreadPoolBuilder::new()
39 .num_threads(threads)
40 .build_global()
41 .expect("Unable to set number of threads");
42
43 println!("Creating index");
44
45 // Aligner gets created using the build pattern.
46 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
47 let aligner = Aligner::builder()
48 .map_ont()
49 .with_cigar()
50 .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
51 .with_index(target_file, None)
52 .expect("Unable to build index");
53
54 println!("Index created");
55
56 // Read in the query file
57 let mut reader = parse_fastx_file(query_file)?;
58
59 let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
60 while let Some(record) = reader.next() {
61 let record = record.expect("Error reading record");
62 queries.push((record.id().to_vec(), record.seq().to_vec()));
63 }
64
65 // Map the queries
66 let results: Vec<Vec<Mapping>> = queries
67 .par_iter()
68 .map(|(id, seq)| {
69 aligner
70 .map(&seq, false, false, None, None, Some(&id))
71 .expect("Error mapping")
72 })
73 .collect();
74
75 // Count total number of alignments
76 let total_alignments: usize = results.iter().map(|x| x.len()).sum();
77 println!("Iteration complete, total alignments {}", total_alignments);
78
79 Ok(())
80}
More examples
50fn map(
51 target_file: impl AsRef<Path>,
52 query_file: impl AsRef<Path>,
53 threads: usize,
54) -> Result<(), Box<dyn Error>> {
55 // Aligner gets created using the build pattern.
56 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
57 println!("Creating index");
58 let aligner = Aligner::builder()
59 .map_ont()
60 .with_cigar()
61 .with_index_threads(threads)
62 .with_index(target_file, None)
63 .expect("Unable to build index");
64
65 println!("Index created");
66
67 // Create a queue for work and for results
68 let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69 let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71 // I use a shutdown flag
72 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74 // Store join handles, it's just good practice to clean up threads
75 let mut jh = Vec::new();
76
77 let aligner = Arc::new(aligner);
78
79 // Spin up the threads
80 for _ in 0..threads {
81 // Clone everything we will need...
82 let work_queue = Arc::clone(&work_queue);
83 let results_queue = Arc::clone(&results_queue);
84 let shutdown = Arc::clone(&shutdown);
85 let aligner = Arc::clone(&aligner);
86
87 let handle =
88 std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90 jh.push(handle);
91 }
92
93 // Let's split this into another thread
94
95 {
96 let work_queue = Arc::clone(&work_queue);
97 let shutdown = Arc::clone(&shutdown);
98 let query_file = query_file.as_ref().to_path_buf();
99
100 let handle = std::thread::spawn(move || {
101 // Now that the threads are running, read the input file and push the work to the queue
102 let mut reader: Box<dyn FastxReader> =
103 parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105 // I just do this in the main thread, but you can split threads
106 let backoff = crossbeam::utils::Backoff::new();
107 while let Some(Ok(record)) = reader.next() {
108 let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109 while let Err(work_packet) = work_queue.push(work) {
110 work = work_packet; // Simple way to maintain ownership
111 // If we have an error, it's 99% because the queue is full
112 backoff.snooze();
113 }
114 }
115
116 // Set the shutdown flag
117 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118 });
119
120 jh.push(handle);
121 }
122
123 let mut num_alignments = 0;
124
125 let backoff = crossbeam::utils::Backoff::new();
126 loop {
127 match results_queue.pop() {
128 // This is where we processs mapping results as they come in...
129 Some(WorkQueue::Result((record, alignments))) => {
130 num_alignments += alignments.len();
131
132 }
133 Some(_) => unimplemented!("Unexpected result type"),
134 None => {
135 backoff.snooze();
136
137 // If all join handles are finished, we can break
138 if jh.iter().all(|h| h.is_finished()) {
139 break;
140 }
141 if backoff.is_completed() {
142 backoff.reset();
143 std::thread::sleep(Duration::from_millis(3));
144 }
145 }
146 }
147 }
148
149 // Join all the threads
150 for handle in jh {
151 handle.join().expect("Unable to join thread");
152 }
153
154 println!("Total alignments: {}", num_alignments);
155
156 Ok(())
157}
Sourcepub fn ava_pb(self) -> Aligner<PresetSet>
pub fn ava_pb(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to AvaPb
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().ava_pb();
Sourcepub fn ava_ont(self) -> Aligner<PresetSet>
pub fn ava_ont(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to AvaOnt.
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().ava_ont();
Sourcepub fn short(self) -> Aligner<PresetSet>
pub fn short(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to Short
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().short();
Sourcepub fn map10k(self) -> Aligner<PresetSet>
pub fn map10k(self) -> Aligner<PresetSet>
Ergonomic function for Aligner. Sets the minimap2 preset to Map10k
Presets should be called before any other options are set, as they change multiple options at once.
Aligner::builder().map10k();
Source§impl<S> Aligner<S>where
S: BuilderState + AcceptsParams,
impl<S> Aligner<S>where
S: BuilderState + AcceptsParams,
Sourcepub fn with_cigar(self) -> Self
pub fn with_cigar(self) -> Self
Set Alignment mode / cigar mode in minimap2
Aligner::builder().map_ont().with_cigar();
Examples found in repository?
32fn map(
33 target_file: impl AsRef<Path>,
34 query_file: impl AsRef<Path>,
35 threads: usize,
36) -> Result<(), Box<dyn Error>> {
37 // Set the number of threads to use
38 rayon::ThreadPoolBuilder::new()
39 .num_threads(threads)
40 .build_global()
41 .expect("Unable to set number of threads");
42
43 println!("Creating index");
44
45 // Aligner gets created using the build pattern.
46 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
47 let aligner = Aligner::builder()
48 .map_ont()
49 .with_cigar()
50 .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
51 .with_index(target_file, None)
52 .expect("Unable to build index");
53
54 println!("Index created");
55
56 // Read in the query file
57 let mut reader = parse_fastx_file(query_file)?;
58
59 let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
60 while let Some(record) = reader.next() {
61 let record = record.expect("Error reading record");
62 queries.push((record.id().to_vec(), record.seq().to_vec()));
63 }
64
65 // Map the queries
66 let results: Vec<Vec<Mapping>> = queries
67 .par_iter()
68 .map(|(id, seq)| {
69 aligner
70 .map(&seq, false, false, None, None, Some(&id))
71 .expect("Error mapping")
72 })
73 .collect();
74
75 // Count total number of alignments
76 let total_alignments: usize = results.iter().map(|x| x.len()).sum();
77 println!("Iteration complete, total alignments {}", total_alignments);
78
79 Ok(())
80}
More examples
50fn map(
51 target_file: impl AsRef<Path>,
52 query_file: impl AsRef<Path>,
53 threads: usize,
54) -> Result<(), Box<dyn Error>> {
55 // Aligner gets created using the build pattern.
56 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
57 println!("Creating index");
58 let aligner = Aligner::builder()
59 .map_ont()
60 .with_cigar()
61 .with_index_threads(threads)
62 .with_index(target_file, None)
63 .expect("Unable to build index");
64
65 println!("Index created");
66
67 // Create a queue for work and for results
68 let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69 let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71 // I use a shutdown flag
72 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74 // Store join handles, it's just good practice to clean up threads
75 let mut jh = Vec::new();
76
77 let aligner = Arc::new(aligner);
78
79 // Spin up the threads
80 for _ in 0..threads {
81 // Clone everything we will need...
82 let work_queue = Arc::clone(&work_queue);
83 let results_queue = Arc::clone(&results_queue);
84 let shutdown = Arc::clone(&shutdown);
85 let aligner = Arc::clone(&aligner);
86
87 let handle =
88 std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90 jh.push(handle);
91 }
92
93 // Let's split this into another thread
94
95 {
96 let work_queue = Arc::clone(&work_queue);
97 let shutdown = Arc::clone(&shutdown);
98 let query_file = query_file.as_ref().to_path_buf();
99
100 let handle = std::thread::spawn(move || {
101 // Now that the threads are running, read the input file and push the work to the queue
102 let mut reader: Box<dyn FastxReader> =
103 parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105 // I just do this in the main thread, but you can split threads
106 let backoff = crossbeam::utils::Backoff::new();
107 while let Some(Ok(record)) = reader.next() {
108 let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109 while let Err(work_packet) = work_queue.push(work) {
110 work = work_packet; // Simple way to maintain ownership
111 // If we have an error, it's 99% because the queue is full
112 backoff.snooze();
113 }
114 }
115
116 // Set the shutdown flag
117 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118 });
119
120 jh.push(handle);
121 }
122
123 let mut num_alignments = 0;
124
125 let backoff = crossbeam::utils::Backoff::new();
126 loop {
127 match results_queue.pop() {
128 // This is where we processs mapping results as they come in...
129 Some(WorkQueue::Result((record, alignments))) => {
130 num_alignments += alignments.len();
131
132 }
133 Some(_) => unimplemented!("Unexpected result type"),
134 None => {
135 backoff.snooze();
136
137 // If all join handles are finished, we can break
138 if jh.iter().all(|h| h.is_finished()) {
139 break;
140 }
141 if backoff.is_completed() {
142 backoff.reset();
143 std::thread::sleep(Duration::from_millis(3));
144 }
145 }
146 }
147 }
148
149 // Join all the threads
150 for handle in jh {
151 handle.join().expect("Unable to join thread");
152 }
153
154 println!("Total alignments: {}", num_alignments);
155
156 Ok(())
157}
pub fn with_cigar_clipping(self) -> Self
pub fn with_sam_out(self) -> Self
pub fn with_sam_hit_only(self) -> Self
Sourcepub fn with_gap_open_penalty(
self,
penalty: i32,
penalty_long: Option<i32>,
) -> Self
pub fn with_gap_open_penalty( self, penalty: i32, penalty_long: Option<i32>, ) -> Self
Sets the gap open penalty for minimap2.
minimap2 -O 4 sets both the short and long gap open penalty to 4. minimap2 code
To set the long gap open penalty, simply provide a value for penalty_long
.
Sourcepub fn with_index_threads(self, threads: usize) -> Self
pub fn with_index_threads(self, threads: usize) -> Self
Sets the number of threads minimap2 will use for building the index
Aligner::builder().with_index_threads(10);
Set the number of threads (prefer to use the struct config)
Examples found in repository?
32fn map(
33 target_file: impl AsRef<Path>,
34 query_file: impl AsRef<Path>,
35 threads: usize,
36) -> Result<(), Box<dyn Error>> {
37 // Set the number of threads to use
38 rayon::ThreadPoolBuilder::new()
39 .num_threads(threads)
40 .build_global()
41 .expect("Unable to set number of threads");
42
43 println!("Creating index");
44
45 // Aligner gets created using the build pattern.
46 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
47 let aligner = Aligner::builder()
48 .map_ont()
49 .with_cigar()
50 .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
51 .with_index(target_file, None)
52 .expect("Unable to build index");
53
54 println!("Index created");
55
56 // Read in the query file
57 let mut reader = parse_fastx_file(query_file)?;
58
59 let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
60 while let Some(record) = reader.next() {
61 let record = record.expect("Error reading record");
62 queries.push((record.id().to_vec(), record.seq().to_vec()));
63 }
64
65 // Map the queries
66 let results: Vec<Vec<Mapping>> = queries
67 .par_iter()
68 .map(|(id, seq)| {
69 aligner
70 .map(&seq, false, false, None, None, Some(&id))
71 .expect("Error mapping")
72 })
73 .collect();
74
75 // Count total number of alignments
76 let total_alignments: usize = results.iter().map(|x| x.len()).sum();
77 println!("Iteration complete, total alignments {}", total_alignments);
78
79 Ok(())
80}
More examples
50fn map(
51 target_file: impl AsRef<Path>,
52 query_file: impl AsRef<Path>,
53 threads: usize,
54) -> Result<(), Box<dyn Error>> {
55 // Aligner gets created using the build pattern.
56 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
57 println!("Creating index");
58 let aligner = Aligner::builder()
59 .map_ont()
60 .with_cigar()
61 .with_index_threads(threads)
62 .with_index(target_file, None)
63 .expect("Unable to build index");
64
65 println!("Index created");
66
67 // Create a queue for work and for results
68 let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69 let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71 // I use a shutdown flag
72 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74 // Store join handles, it's just good practice to clean up threads
75 let mut jh = Vec::new();
76
77 let aligner = Arc::new(aligner);
78
79 // Spin up the threads
80 for _ in 0..threads {
81 // Clone everything we will need...
82 let work_queue = Arc::clone(&work_queue);
83 let results_queue = Arc::clone(&results_queue);
84 let shutdown = Arc::clone(&shutdown);
85 let aligner = Arc::clone(&aligner);
86
87 let handle =
88 std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90 jh.push(handle);
91 }
92
93 // Let's split this into another thread
94
95 {
96 let work_queue = Arc::clone(&work_queue);
97 let shutdown = Arc::clone(&shutdown);
98 let query_file = query_file.as_ref().to_path_buf();
99
100 let handle = std::thread::spawn(move || {
101 // Now that the threads are running, read the input file and push the work to the queue
102 let mut reader: Box<dyn FastxReader> =
103 parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105 // I just do this in the main thread, but you can split threads
106 let backoff = crossbeam::utils::Backoff::new();
107 while let Some(Ok(record)) = reader.next() {
108 let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109 while let Err(work_packet) = work_queue.push(work) {
110 work = work_packet; // Simple way to maintain ownership
111 // If we have an error, it's 99% because the queue is full
112 backoff.snooze();
113 }
114 }
115
116 // Set the shutdown flag
117 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118 });
119
120 jh.push(handle);
121 }
122
123 let mut num_alignments = 0;
124
125 let backoff = crossbeam::utils::Backoff::new();
126 loop {
127 match results_queue.pop() {
128 // This is where we processs mapping results as they come in...
129 Some(WorkQueue::Result((record, alignments))) => {
130 num_alignments += alignments.len();
131
132 }
133 Some(_) => unimplemented!("Unexpected result type"),
134 None => {
135 backoff.snooze();
136
137 // If all join handles are finished, we can break
138 if jh.iter().all(|h| h.is_finished()) {
139 break;
140 }
141 if backoff.is_completed() {
142 backoff.reset();
143 std::thread::sleep(Duration::from_millis(3));
144 }
145 }
146 }
147 }
148
149 // Join all the threads
150 for handle in jh {
151 handle.join().expect("Unable to join thread");
152 }
153
154 println!("Total alignments: {}", num_alignments);
155
156 Ok(())
157}
pub fn with_threads(self, threads: usize) -> Self
with_index_threads
insteadSourcepub fn check_opts(&self) -> Result<(), &'static str>
pub fn check_opts(&self) -> Result<(), &'static str>
Check if the options are valid - Maps to mm_check_opt in minimap2
Sourcepub fn with_index<P>(
self,
path: P,
output: Option<&str>,
) -> Result<Aligner<Built>, &'static str>
pub fn with_index<P>( self, path: P, output: Option<&str>, ) -> Result<Aligner<Built>, &'static str>
Set index parameters for minimap2 using builder pattern Creates the index as well with the given number of threads (set at struct creation). You must set the number of threads before calling this function.
Parameters: path: Location of pre-built index or FASTA/FASTQ file (may be gzipped or plaintext) Output: Option (None) or a filename
Returns the aligner with the index set
// Do not save the index file
Aligner::builder().map_ont().with_index("test_data/test_data.fasta", None);
// Save the index file as my_index.mmi
Aligner::builder().map_ont().with_index("test_data/test_data.fasta", Some("my_index.mmi"));
// Use the previously built index
Aligner::builder().map_ont().with_index("my_index.mmi", None);
Examples found in repository?
32fn map(
33 target_file: impl AsRef<Path>,
34 query_file: impl AsRef<Path>,
35 threads: usize,
36) -> Result<(), Box<dyn Error>> {
37 // Set the number of threads to use
38 rayon::ThreadPoolBuilder::new()
39 .num_threads(threads)
40 .build_global()
41 .expect("Unable to set number of threads");
42
43 println!("Creating index");
44
45 // Aligner gets created using the build pattern.
46 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
47 let aligner = Aligner::builder()
48 .map_ont()
49 .with_cigar()
50 .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
51 .with_index(target_file, None)
52 .expect("Unable to build index");
53
54 println!("Index created");
55
56 // Read in the query file
57 let mut reader = parse_fastx_file(query_file)?;
58
59 let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
60 while let Some(record) = reader.next() {
61 let record = record.expect("Error reading record");
62 queries.push((record.id().to_vec(), record.seq().to_vec()));
63 }
64
65 // Map the queries
66 let results: Vec<Vec<Mapping>> = queries
67 .par_iter()
68 .map(|(id, seq)| {
69 aligner
70 .map(&seq, false, false, None, None, Some(&id))
71 .expect("Error mapping")
72 })
73 .collect();
74
75 // Count total number of alignments
76 let total_alignments: usize = results.iter().map(|x| x.len()).sum();
77 println!("Iteration complete, total alignments {}", total_alignments);
78
79 Ok(())
80}
More examples
50fn map(
51 target_file: impl AsRef<Path>,
52 query_file: impl AsRef<Path>,
53 threads: usize,
54) -> Result<(), Box<dyn Error>> {
55 // Aligner gets created using the build pattern.
56 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
57 println!("Creating index");
58 let aligner = Aligner::builder()
59 .map_ont()
60 .with_cigar()
61 .with_index_threads(threads)
62 .with_index(target_file, None)
63 .expect("Unable to build index");
64
65 println!("Index created");
66
67 // Create a queue for work and for results
68 let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
69 let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));
70
71 // I use a shutdown flag
72 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
73
74 // Store join handles, it's just good practice to clean up threads
75 let mut jh = Vec::new();
76
77 let aligner = Arc::new(aligner);
78
79 // Spin up the threads
80 for _ in 0..threads {
81 // Clone everything we will need...
82 let work_queue = Arc::clone(&work_queue);
83 let results_queue = Arc::clone(&results_queue);
84 let shutdown = Arc::clone(&shutdown);
85 let aligner = Arc::clone(&aligner);
86
87 let handle =
88 std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));
89
90 jh.push(handle);
91 }
92
93 // Let's split this into another thread
94
95 {
96 let work_queue = Arc::clone(&work_queue);
97 let shutdown = Arc::clone(&shutdown);
98 let query_file = query_file.as_ref().to_path_buf();
99
100 let handle = std::thread::spawn(move || {
101 // Now that the threads are running, read the input file and push the work to the queue
102 let mut reader: Box<dyn FastxReader> =
103 parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file"));
104
105 // I just do this in the main thread, but you can split threads
106 let backoff = crossbeam::utils::Backoff::new();
107 while let Some(Ok(record)) = reader.next() {
108 let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
109 while let Err(work_packet) = work_queue.push(work) {
110 work = work_packet; // Simple way to maintain ownership
111 // If we have an error, it's 99% because the queue is full
112 backoff.snooze();
113 }
114 }
115
116 // Set the shutdown flag
117 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
118 });
119
120 jh.push(handle);
121 }
122
123 let mut num_alignments = 0;
124
125 let backoff = crossbeam::utils::Backoff::new();
126 loop {
127 match results_queue.pop() {
128 // This is where we processs mapping results as they come in...
129 Some(WorkQueue::Result((record, alignments))) => {
130 num_alignments += alignments.len();
131
132 }
133 Some(_) => unimplemented!("Unexpected result type"),
134 None => {
135 backoff.snooze();
136
137 // If all join handles are finished, we can break
138 if jh.iter().all(|h| h.is_finished()) {
139 break;
140 }
141 if backoff.is_completed() {
142 backoff.reset();
143 std::thread::sleep(Duration::from_millis(3));
144 }
145 }
146 }
147 }
148
149 // Join all the threads
150 for handle in jh {
151 handle.join().expect("Unable to join thread");
152 }
153
154 println!("Total alignments: {}", num_alignments);
155
156 Ok(())
157}
Sourcepub fn set_index<P>(
self,
path: P,
output: Option<&str>,
) -> Result<Aligner<Built>, &'static str>
pub fn set_index<P>( self, path: P, output: Option<&str>, ) -> Result<Aligner<Built>, &'static str>
Sets the index, uses the builder pattern. Returns Aligner
Sourcepub fn with_seq(self, seq: &[u8]) -> Result<Aligner<Built>, &'static str>
pub fn with_seq(self, seq: &[u8]) -> Result<Aligner<Built>, &'static str>
Use a single sequence as the index. Sets the sequence ID to “N/A”.
Can not be combined with with_index
or set_index
.
Following the mappy implementation, this also sets mapopt.mid_occ to 1000.
let aligner = Aligner::builder().map_ont().with_seq(seq.as_bytes()).expect("Unable to build index");
let query = b"CGGCACCAGGTTAAAATCTGAGTGCTGCAATAGGCGATTACAGTACAGCACCCAGCCTCCG";
let hits = aligner.map(query, false, false, None, None, Some(b"Query Name"));
assert_eq!(hits.unwrap().len(), 1);
Sourcepub fn with_seq_and_id(
self,
seq: &[u8],
id: &[u8],
) -> Result<Aligner<Built>, &'static str>
pub fn with_seq_and_id( self, seq: &[u8], id: &[u8], ) -> Result<Aligner<Built>, &'static str>
Use a single sequence as the index. Sets the sequence ID to “N/A”.
Can not be combined with with_index
or set_index
.
Following the mappy implementation, this also sets mapopt.mid_occ to 1000.
let aligner = Aligner::builder().map_ont().with_seq_and_id(seq.as_bytes(), id.as_bytes()).expect("Unable to build index");
let query = b"CGGCACCAGGTTAAAATCTGAGTGCTGCAATAGGCGATTACAGTACAGCACCCAGCCTCCG";
let hits = aligner.map(query, false, false, None, None, Some(b"Sample Query"));
assert_eq!(hits.as_ref().unwrap().len(), 1);
assert_eq!(hits.as_ref().unwrap()[0].target_name.as_ref().unwrap().as_str(), id);
Sourcepub fn with_seqs(self, seqs: &[Vec<u8>]) -> Result<Aligner<Built>, &'static str>
pub fn with_seqs(self, seqs: &[Vec<u8>]) -> Result<Aligner<Built>, &'static str>
TODO: Does not work for more than 1 seq currently!
Pass multiple sequences to build an index functionally.
Following the mappy implementation, this also sets mapopt.mid_occ to 1000.
Can not be combined with with_index
or set_index
.
Sets the sequence IDs to “Unnamed Sequence n” where n is the sequence number.
Sourcepub fn with_seqs_and_ids(
self,
seqs: &[Vec<u8>],
ids: &[Vec<u8>],
) -> Result<Aligner<Built>, &'static str>
pub fn with_seqs_and_ids( self, seqs: &[Vec<u8>], ids: &[Vec<u8>], ) -> Result<Aligner<Built>, &'static str>
TODO: Does not work for more than 1 seq currently! Pass multiple sequences and corresponding IDs to build an index functionally. Following the mappy implementation, this also sets mapopt.mid_occ to 1000.
Sourcepub fn additional_preset(self, preset: Preset) -> Self
pub fn additional_preset(self, preset: Preset) -> Self
Applies an additional preset to the aligner WARNING: This overwrites multiple other parameters. Make sure you know what you are doing
Presets should be called before any other options are set, as they change multiple options at once.
Source§impl Aligner<Built>
impl Aligner<Built>
Sourcepub fn get_seq<'aln>(&'aln self, i: usize) -> Option<&'aln mm_idx_seq_t>
pub fn get_seq<'aln>(&'aln self, i: usize) -> Option<&'aln mm_idx_seq_t>
Get sequences direct from the index
Returns a reference to the sequence at the given index Remainds valid as long as the aligner is valid
Sourcepub fn map(
&self,
seq: &[u8],
cs: bool,
md: bool,
max_frag_len: Option<usize>,
extra_flags: Option<&[u64]>,
query_name: Option<&[u8]>,
) -> Result<Vec<Mapping>, &'static str>
pub fn map( &self, seq: &[u8], cs: bool, md: bool, max_frag_len: Option<usize>, extra_flags: Option<&[u64]>, query_name: Option<&[u8]>, ) -> Result<Vec<Mapping>, &'static str>
Aligns a given sequence (as bytes) to the index associated with this aligner
Parameters:
seq: Sequence to align
cs: Whether to output CIGAR string
MD: Whether to output MD tag
max_frag_len: Maximum fragment length
extra_flags: Extra flags to pass to minimap2 as Vec<u64>
query_name: Name of the query sequence
Examples found in repository?
160fn worker(
161 work_queue: Arc<ArrayQueue<WorkQueue<WorkUnit>>>,
162 results_queue: Arc<ArrayQueue<WorkQueue<WorkResult>>>,
163 shutdown: Arc<std::sync::atomic::AtomicBool>,
164 aligner: Arc<Aligner<Built>>,
165) {
166 loop {
167 // We use backoff to sleep when we don't have any work
168 let backoff = crossbeam::utils::Backoff::new();
169
170 match work_queue.pop() {
171 Some(WorkQueue::Work(sequence)) => {
172 let alignment = aligner
173 .map(&sequence.1, true, false, None, None, Some(&sequence.0))
174 .expect("Unable to align");
175
176 // Return the original sequence, as well as the mappings
177 // We have to do it this way because ownership
178 let mut result = WorkQueue::Result((sequence, alignment));
179 while let Err(result_packet) = results_queue.push(result) {
180 result = result_packet; // Simple way to maintain ownership
181 // If we have an error, it's 99% because the queue is full
182 backoff.snooze();
183 }
184 }
185 Some(_) => unimplemented!("Unexpected work type"),
186 None => {
187 backoff.snooze();
188
189 // If we have the shutdown signal, we should exit
190 if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() {
191 break;
192 }
193 }
194 }
195 }
196}
More examples
32fn map(
33 target_file: impl AsRef<Path>,
34 query_file: impl AsRef<Path>,
35 threads: usize,
36) -> Result<(), Box<dyn Error>> {
37 // Set the number of threads to use
38 rayon::ThreadPoolBuilder::new()
39 .num_threads(threads)
40 .build_global()
41 .expect("Unable to set number of threads");
42
43 println!("Creating index");
44
45 // Aligner gets created using the build pattern.
46 // Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
47 let aligner = Aligner::builder()
48 .map_ont()
49 .with_cigar()
50 .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
51 .with_index(target_file, None)
52 .expect("Unable to build index");
53
54 println!("Index created");
55
56 // Read in the query file
57 let mut reader = parse_fastx_file(query_file)?;
58
59 let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
60 while let Some(record) = reader.next() {
61 let record = record.expect("Error reading record");
62 queries.push((record.id().to_vec(), record.seq().to_vec()));
63 }
64
65 // Map the queries
66 let results: Vec<Vec<Mapping>> = queries
67 .par_iter()
68 .map(|(id, seq)| {
69 aligner
70 .map(&seq, false, false, None, None, Some(&id))
71 .expect("Error mapping")
72 })
73 .collect();
74
75 // Count total number of alignments
76 let total_alignments: usize = results.iter().map(|x| x.len()).sum();
77 println!("Iteration complete, total alignments {}", total_alignments);
78
79 Ok(())
80}
Sourcepub fn map_file(
&self,
file: &str,
cs: bool,
md: bool,
) -> Result<Vec<Mapping>, &'static str>
pub fn map_file( &self, file: &str, cs: bool, md: bool, ) -> Result<Vec<Mapping>, &'static str>
Map entire file Detects if file is gzip or not and if it’s fastq/fasta or not Best for smaller files (all results are stored in an accumulated Vec!) What you probably want is to loop through the file yourself and use the map() function
TODO: Remove cs and md and make them options on the struct