1use anyhow::{Context, Result, anyhow};
5use bio::io::bed;
6use crossbeam::channel::{Receiver, bounded};
7use lazy_static::lazy_static;
8use log::*;
9use num_cpus;
10use rayon::prelude::*;
11use rust_htslib::{
12 bam::{HeaderView, IndexedReader, Read},
13 bcf::{Read as bcfRead, Reader},
14};
15use rust_lapper::{Interval, Lapper};
16use serde::Serialize;
17use std::{convert::TryInto, path::PathBuf, thread};
18
19const BYTES_INA_GIGABYTE: usize = 1024 * 1024 * 1024;
20
21pub const CHANNEL_SIZE_MODIFIER: f64 = 0.15;
24
25pub const CHUNKSIZE: u32 = 1_000_000;
27
28lazy_static! {
29 pub static ref CHANNEL_SIZE_MODIFIER_STR: String = CHANNEL_SIZE_MODIFIER.to_string();
31
32 pub static ref CHUNKSIZE_STR: String = CHUNKSIZE.to_string();
34}
35
36pub trait RegionProcessor {
38 type P: 'static + Send + Sync + Serialize;
43
44 fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec<Self::P>;
48}
49
50#[derive(Debug)]
55pub struct ParGranges<R: 'static + RegionProcessor + Send + Sync> {
56 reads: PathBuf,
58 ref_fasta: Option<PathBuf>,
60 regions_bed: Option<PathBuf>,
62 regions_bcf: Option<PathBuf>,
64 merge_regions: bool,
66 threads: usize,
68 chunksize: u32,
70 channel_size_modifier: f64,
72 pool: rayon::ThreadPool,
74 processor: R,
76}
77
78impl<R: RegionProcessor + Send + Sync> ParGranges<R> {
79 #[allow(clippy::too_many_arguments)]
95 pub fn new(
96 reads: PathBuf,
97 ref_fasta: Option<PathBuf>,
98 regions_bed: Option<PathBuf>,
99 regions_bcf: Option<PathBuf>,
100 merge_regions: bool,
101 threads: Option<usize>,
102 chunksize: Option<u32>,
103 channel_size_modifier: Option<f64>,
104 processor: R,
105 ) -> Self {
106 let threads = if let Some(threads) = threads {
107 threads
108 } else {
109 num_cpus::get()
110 };
111
112 let threads = std::cmp::max(threads.saturating_sub(2), 1);
114 let pool = rayon::ThreadPoolBuilder::new()
115 .num_threads(threads)
116 .build()
117 .unwrap();
118
119 info!("Using {} worker threads.", threads);
120 Self {
121 reads,
122 ref_fasta,
123 regions_bed,
124 regions_bcf,
125 merge_regions,
126 threads,
127 chunksize: chunksize.unwrap_or(CHUNKSIZE),
128 channel_size_modifier: channel_size_modifier.unwrap_or(CHANNEL_SIZE_MODIFIER),
129 pool,
130 processor,
131 }
132 }
133
134 pub fn process(self) -> Result<Receiver<R::P>> {
148 let channel_size: usize = ((BYTES_INA_GIGABYTE as f64 * self.channel_size_modifier).floor()
149 as usize
150 / std::mem::size_of::<R::P>())
151 * self.threads;
152 info!(
153 "Creating channel of length {:?} (* 120 bytes to get mem)",
154 channel_size
155 );
156 let (snd, rxv) = bounded(channel_size);
157 thread::spawn(move || {
158 self.pool.install(|| {
159 info!("Reading from {:?}", self.reads);
160 let mut reader = IndexedReader::from_path(&self.reads).expect("Indexed BAM/CRAM");
161 if let Some(ref_fasta) = &self.ref_fasta {
163 reader.set_reference(ref_fasta).expect("Set ref");
164 }
165 let header = reader.header().to_owned();
167
168 let bed_intervals = if let Some(regions_bed) = &self.regions_bed {
170 Some(
171 Self::bed_to_intervals(&header, regions_bed, self.merge_regions)
172 .expect("Parsed BED to intervals"),
173 )
174 } else {
175 None
176 };
177 let bcf_intervals = if let Some(regions_bcf) = &self.regions_bcf {
178 Some(
179 Self::bcf_to_intervals(&header, regions_bcf, self.merge_regions)
180 .expect("Parsed BCF/VCF to intervals"),
181 )
182 } else {
183 None
184 };
185 let restricted_ivs = match (bed_intervals, bcf_intervals) {
186 (Some(bed_ivs), Some(bcf_ivs)) => {
187 Some(Self::merge_intervals(bed_ivs, bcf_ivs, self.merge_regions))
188 }
189 (Some(bed_ivs), None) => Some(bed_ivs),
190 (None, Some(bcf_ivs)) => Some(bcf_ivs),
191 (None, None) => None,
192 };
193
194 let intervals = if let Some(restricted) = restricted_ivs {
195 restricted
196 } else {
197 Self::header_to_intervals(&header, self.chunksize)
198 .expect("Parsed BAM/CRAM header to intervals")
199 };
200
201 let serial_step_size = self.chunksize.saturating_mul(self.threads as u32); for (tid, intervals) in intervals.into_iter().enumerate() {
204 let tid: u32 = tid as u32;
205 let tid_end: u32 = header.target_len(tid).unwrap().try_into().unwrap();
206 info!("Processing TID {}:0-{}", tid, tid_end);
207 let mut result = vec![];
209 for chunk_start in (0..tid_end).step_by(serial_step_size as usize) {
210 let tid_name = std::str::from_utf8(header.tid2name(tid)).unwrap();
211 let chunk_end = std::cmp::min(chunk_start + serial_step_size, tid_end);
212 trace!(
213 "Batch Processing {}:{}-{}",
214 tid_name, chunk_start, chunk_end
215 );
216 let (r, _) = rayon::join(
217 || {
218 let ivs: Vec<Interval<u32, ()>> =
220 Lapper::<u32, ()>::find(&intervals, chunk_start, chunk_end)
221 .map(|iv| Interval {
223 start: std::cmp::max(iv.start, chunk_start),
224 stop: std::cmp::min(iv.stop, chunk_end),
225 val: (),
226 })
227 .collect();
228 ivs.into_par_iter()
229 .flat_map(|iv| {
230 trace!("Processing {}:{}-{}", tid_name, iv.start, iv.stop);
231 self.processor.process_region(tid, iv.start, iv.stop)
232 })
233 .collect()
234 },
235 || {
236 result.into_iter().for_each(|p| {
237 snd.send(p).expect("Sent a serializable to writer")
238 })
239 },
240 );
241 result = r;
242 }
243 result
245 .into_iter()
246 .for_each(|p| snd.send(p).expect("Sent a serializable to writer"));
247 }
248 });
249 });
250 Ok(rxv)
251 }
252
253 fn header_to_intervals(header: &HeaderView, chunksize: u32) -> Result<Vec<Lapper<u32, ()>>> {
255 let mut intervals = vec![vec![]; header.target_count() as usize];
256 for tid in 0..(header.target_count()) {
257 let tid_len: u32 = header.target_len(tid).unwrap().try_into().unwrap();
258 for start in (0..tid_len).step_by(chunksize as usize) {
259 let stop = std::cmp::min(start + chunksize, tid_len);
260 intervals[tid as usize].push(Interval {
261 start,
262 stop,
263 val: (),
264 });
265 }
266 }
267 Ok(intervals.into_iter().map(Lapper::new).collect())
268 }
269
270 fn bed_to_intervals(
274 header: &HeaderView,
275 bed_file: &PathBuf,
276 merge: bool,
277 ) -> Result<Vec<Lapper<u32, ()>>> {
278 let mut bed_reader = bed::Reader::from_file(bed_file)?;
279 let mut intervals = vec![vec![]; header.target_count() as usize];
280 for (i, record) in bed_reader.records().enumerate() {
281 let record = record?;
282 let tid = header
283 .tid(record.chrom().as_bytes())
284 .expect("Chromosome not found in BAM/CRAM header");
285 let start = record
286 .start()
287 .try_into()
288 .with_context(|| format!("BED record {} is invalid: unable to parse start", i))?;
289 let stop = record
290 .end()
291 .try_into()
292 .with_context(|| format!("BED record {} is invalid: unable to parse stop", i))?;
293 if stop < start {
294 return Err(anyhow!("BED record {} is invalid: stop < start", i));
295 }
296 intervals[tid as usize].push(Interval {
297 start,
298 stop,
299 val: (),
300 });
301 }
302
303 Ok(intervals
304 .into_iter()
305 .map(|ivs| {
306 let mut lapper = Lapper::new(ivs);
307 if merge {
308 lapper.merge_overlaps();
309 }
310 lapper
311 })
312 .collect())
313 }
314
315 fn bcf_to_intervals(
318 header: &HeaderView,
319 bcf_file: &PathBuf,
320 merge: bool,
321 ) -> Result<Vec<Lapper<u32, ()>>> {
322 let mut bcf_reader = Reader::from_path(bcf_file).expect("Error opening BCF/VCF file.");
323 let bcf_header_reader = Reader::from_path(bcf_file).expect("Error opening BCF/VCF file.");
324 let bcf_header = bcf_header_reader.header();
325 let mut intervals = vec![vec![]; header.target_count() as usize];
326 for record in bcf_reader.records() {
328 let record = record?;
329 let record_rid = bcf_header.rid2name(record.rid().unwrap()).unwrap();
330 let tid = header
331 .tid(record_rid)
332 .expect("Chromosome not found in BAM/CRAM header");
333 let pos: u32 = record
334 .pos()
335 .try_into()
336 .expect("Got a negative value for pos");
337 intervals[tid as usize].push(Interval {
338 start: pos,
339 stop: pos + 1,
340 val: (),
341 });
342 }
343
344 Ok(intervals
345 .into_iter()
346 .map(|ivs| {
347 let mut lapper = Lapper::new(ivs);
348 if merge {
349 lapper.merge_overlaps();
350 }
351 lapper
352 })
353 .collect())
354 }
355
356 fn merge_intervals(
359 a_ivs: Vec<Lapper<u32, ()>>,
360 b_ivs: Vec<Lapper<u32, ()>>,
361 merge: bool,
362 ) -> Vec<Lapper<u32, ()>> {
363 let mut intervals = vec![vec![]; a_ivs.len()];
364 for (i, (a_lapper, b_lapper)) in a_ivs.into_iter().zip(b_ivs.into_iter()).enumerate() {
365 intervals[i] = a_lapper.into_iter().chain(b_lapper.into_iter()).collect();
366 }
367 intervals
368 .into_iter()
369 .map(|ivs| {
370 let mut lapper = Lapper::new(ivs);
371 if merge {
372 lapper.merge_overlaps();
373 }
374 lapper
375 })
376 .collect()
377 }
378}
379
380#[cfg(test)]
381mod test {
382 use super::*;
383 use bio::io::bed;
384 use num_cpus;
385 use proptest::prelude::*;
386 use rust_htslib::{bam, bcf};
387 use rust_lapper::{Interval, Lapper};
388 use std::collections::{HashMap, HashSet};
389 use tempfile::tempdir;
390 prop_compose! {
393 fn arb_iv_start(max_iv: u64)(start in 0..max_iv/2) -> u64 { start }
394 }
395 prop_compose! {
396 fn arb_iv_size(max_iv: u64)(size in 1..max_iv/2) -> u64 { size }
397 }
398 prop_compose! {
399 fn arb_iv(max_iv: u64)(start in arb_iv_start(max_iv), size in arb_iv_size(max_iv)) -> Interval<u64, ()> {
401 Interval {start, stop: start + size, val: ()}
402 }
403 }
404 fn arb_ivs(
406 max_iv: u64, max_ivs: usize, ) -> impl Strategy<Value = (Vec<Interval<u64, ()>>, u64, u64)> {
409 prop::collection::vec(arb_iv(max_iv), 0..max_ivs).prop_map(|vec| {
410 let mut furthest_right = 0;
411 let lapper = Lapper::new(vec.clone());
412 let expected = lapper.cov();
413 for iv in vec.iter() {
414 if iv.stop > furthest_right {
415 furthest_right = iv.stop;
416 }
417 }
418 (vec, expected, furthest_right)
419 })
420 }
421 fn arb_chrs(
423 max_chr: usize, max_iv: u64, max_ivs: usize, ) -> impl Strategy<Value = Vec<(Vec<Interval<u64, ()>>, u64, u64)>> {
427 prop::collection::vec(arb_ivs(max_iv, max_ivs), 0..max_chr)
428 }
429 proptest! {
433 #[test]
434 fn interval_set(chromosomes in arb_chrs(4, 10_000, 1_000), chunksize in any::<u32>(), cpus in 0..num_cpus::get(), use_bed in any::<bool>(), use_vcf in any::<bool>()) {
437 let tempdir = tempdir().unwrap();
438 let bam_path = tempdir.path().join("test.bam");
439 let bed_path = tempdir.path().join("test.bed");
440 let vcf_path = tempdir.path().join("test.vcf");
441
442 let mut header = bam::header::Header::new();
444 for (i,chr) in chromosomes.iter().enumerate() {
445 let mut chr_rec = bam::header::HeaderRecord::new(b"SQ");
446 chr_rec.push_tag(b"SN", &i.to_string());
447 chr_rec.push_tag(b"LN", &chr.2.to_string()); header.push_record(&chr_rec);
449 }
450 let writer = bam::Writer::from_path(&bam_path, &header, bam::Format::Bam).expect("Opened test.bam for writing");
451 drop(writer); bam::index::build(&bam_path, None, bam::index::Type::Bai, 1).unwrap();
453
454 let mut writer = bed::Writer::to_file(&bed_path).expect("Opened test.bed for writing");
456 for (i, chr) in chromosomes.iter().enumerate() {
457 for iv in chr.0.iter() {
458 let mut record = bed::Record::new();
459 record.set_start(iv.start);
460 record.set_end(iv.stop);
461 record.set_chrom(&i.to_string());
462 record.set_score(&0.to_string());
463 writer.write(&record).expect("Wrote to test.bed");
464 }
465 }
466 drop(writer); let mut vcf_truth = HashMap::new();
470 let mut header = bcf::header::Header::new();
471 for (i,chr) in chromosomes.iter().enumerate() {
472 header.push_record(format!("##contig=<ID={},length={}>", &i.to_string(), &chr.2.to_string()).as_bytes());
473 }
474 let mut writer = bcf::Writer::from_path(&vcf_path, &header, true, bcf::Format::Vcf).expect("Failed to open test.vcf for writing");
475 let mut record = writer.empty_record();
476 for (i, chr) in chromosomes.iter().enumerate() {
477 record.set_rid(Some(i as u32));
478 let counter = vcf_truth.entry(i).or_insert(0);
479 let mut seen = HashSet::new();
480 for iv in chr.0.iter() {
481 if !seen.contains(&iv.start) {
482 *counter += 1;
483 seen.insert(iv.start);
484 }
485 record.set_pos(iv.start as i64);
486 writer.write(&record).expect("Failed to write to test.vcf")
487 }
488 }
489
490 drop(writer); let test_processor = TestProcessor {};
493 let par_granges_runner = ParGranges::new(
494 bam_path,
495 None,
496 if use_bed { Some(bed_path) } else { None }, if use_vcf { Some(vcf_path) } else { None }, true,
499 Some(cpus),
500 Some(chunksize),
501 Some(0.002),
502 test_processor
503 );
504 let receiver = par_granges_runner.process().expect("Launch ParGranges Process");
505 let mut chrom_counts = HashMap::new();
506 receiver.into_iter().for_each(|p: PileupPosition| {
507 let positions = chrom_counts.entry(p.ref_seq.parse::<usize>().expect("parsed chr")).or_insert(0u64);
508 *positions += 1
509 });
510
511 for (chrom, positions) in chrom_counts.iter() {
513 if use_bed && !use_vcf {
514 prop_assert_eq!(chromosomes[*chrom].1, *positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].1, positions);
516 } else if use_bed && use_vcf {
517 prop_assert_eq!(chromosomes[*chrom].1, *positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].1, positions);
519 } else if use_vcf && !use_bed {
520 prop_assert_eq!(vcf_truth.get(chrom).unwrap(), positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].1, positions);
522 } else {
523 prop_assert_eq!(chromosomes[*chrom].2, *positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].2, positions);
525 }
526 }
527
528 }
529 }
530
531 use crate::position::{Position, pileup_position::PileupPosition};
532 use smartstring::SmartString;
533 struct TestProcessor {}
534 impl RegionProcessor for TestProcessor {
535 type P = PileupPosition;
536
537 fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec<Self::P> {
538 let mut results = vec![];
539 for i in start..stop {
540 let chr = SmartString::from(&tid.to_string());
541 let pos = PileupPosition::new(chr, i);
542 results.push(pos);
543 }
544 results
545 }
546 }
547}