perbase_lib/
par_granges.rs

1//! # ParGranges
2//!
3//! Iterates over chunked genomic regions in parallel.
4use anyhow::{Context, Result, anyhow};
5use bio::io::bed;
6use crossbeam::channel::{Receiver, bounded};
7use lazy_static::lazy_static;
8use log::*;
9use num_cpus;
10use rayon::prelude::*;
11use rust_htslib::{
12    bam::{HeaderView, IndexedReader, Read},
13    bcf::{Read as bcfRead, Reader},
14};
15use rust_lapper::{Interval, Lapper};
16use serde::Serialize;
17use std::{convert::TryInto, path::PathBuf, thread};
18
19const BYTES_INA_GIGABYTE: usize = 1024 * 1024 * 1024;
20
21/// A modifier to apply to the channel size formular that is (BYTES_INA_GIGABYTE * channel_size_modifier) * threads / size_of(R::P)
22/// 0.15 roughly corresponds to 1_000_000 PileupPosition objects per thread with some wiggle room.
23pub const CHANNEL_SIZE_MODIFIER: f64 = 0.15;
24
25/// The ideal number of basepairs each worker will receive. Total bp in memory at one time = `threads` * `chunksize`
26pub const CHUNKSIZE: u32 = 1_000_000;
27
28lazy_static! {
29    /// CHANNEL_SIZE_MODIFIER as a str
30    pub static ref CHANNEL_SIZE_MODIFIER_STR: String = CHANNEL_SIZE_MODIFIER.to_string();
31
32    /// CHUNKSIZE as a str
33    pub static ref CHUNKSIZE_STR: String = CHUNKSIZE.to_string();
34}
35
36/// RegionProcessor defines the methods that must be implemented to process a region
37pub trait RegionProcessor {
38    /// A vector of P make up the output of [`process_region`] and
39    /// are values associated with each position.
40    ///
41    /// [`process_region`]: #method.process_region
42    type P: 'static + Send + Sync + Serialize;
43
44    /// A function that takes the tid, start, and stop and returns something serializable.
45    /// Note, a common use of this function will be a `fetch` -> `pileup`. The pileup must
46    /// be bounds checked.
47    fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec<Self::P>;
48}
49
50/// ParGranges holds all the information and configuration needed to launch the
51/// [`ParGranges::process`].
52///
53/// [`ParGranges::process`]: #method.process
54#[derive(Debug)]
55pub struct ParGranges<R: 'static + RegionProcessor + Send + Sync> {
56    /// Path to an indexed BAM / CRAM file
57    reads: PathBuf,
58    /// Optional reference file for CRAM
59    ref_fasta: Option<PathBuf>,
60    /// Optional path to a BED file to restrict the regions iterated over
61    regions_bed: Option<PathBuf>,
62    /// Optional path to a BCF/VCF file to restrict the regions iterated over
63    regions_bcf: Option<PathBuf>,
64    /// If `regions_bed` and or `regions_bcf` is specified, and this is true, merge any overlapping regions to avoid duplicate output.
65    merge_regions: bool,
66    /// Number of threads this is allowed to use, uses all if None
67    threads: usize,
68    /// The ideal number of basepairs each worker will receive. Total bp in memory at one time = `threads` * `chunksize`
69    chunksize: u32,
70    /// A modifier to apply to the channel size formular that is (BYTES_INA_GIGABYTE * channel_size_modifier) * threads / size_of(R::P)
71    channel_size_modifier: f64,
72    /// The rayon threadpool to operate in
73    pool: rayon::ThreadPool,
74    /// The implementation of [RegionProcessor] that will be used to process regions
75    processor: R,
76}
77
78impl<R: RegionProcessor + Send + Sync> ParGranges<R> {
79    /// Create a ParIO object
80    ///
81    /// # Arguments
82    ///
83    /// * `reads`- path to an indexed BAM/CRAM
84    /// * `ref_fasta`- path to an indexed reference file for CRAM
85    /// * `regions_bed`- Optional BED file path restricting the regions to be examined
86    /// * `regions_bcf`- Optional BCF/VCF file path restricting the regions to be examined
87    /// * `merge_regions` - If `regions_bed` and or `regions_bcf` is specified, and this is true, merge any overlapping regions to avoid duplicate output.
88    /// * `threads`- Optional threads to restrict the number of threads this process will use, defaults to all
89    /// * `chunksize`- optional argument to change the default chunksize of 1_000_000. `chunksize` determines the number of bases
90    ///   each worker will get to work on at one time.
91    /// * `channel_size_modifier`- Optional argument to modify the default size ration of the channel that `R::P` is sent on.
92    ///   formula is: ((BYTES_INA_GIGABYTE * channel_size_modifier) * threads) / size_of(R::P)
93    /// * `processor`- Something that implements [`RegionProcessor`](RegionProcessor)
94    #[allow(clippy::too_many_arguments)]
95    pub fn new(
96        reads: PathBuf,
97        ref_fasta: Option<PathBuf>,
98        regions_bed: Option<PathBuf>,
99        regions_bcf: Option<PathBuf>,
100        merge_regions: bool,
101        threads: Option<usize>,
102        chunksize: Option<u32>,
103        channel_size_modifier: Option<f64>,
104        processor: R,
105    ) -> Self {
106        let threads = if let Some(threads) = threads {
107            threads
108        } else {
109            num_cpus::get()
110        };
111
112        // Keep two around for main thread and thread running the pool
113        let threads = std::cmp::max(threads.saturating_sub(2), 1);
114        let pool = rayon::ThreadPoolBuilder::new()
115            .num_threads(threads)
116            .build()
117            .unwrap();
118
119        info!("Using {} worker threads.", threads);
120        Self {
121            reads,
122            ref_fasta,
123            regions_bed,
124            regions_bcf,
125            merge_regions,
126            threads,
127            chunksize: chunksize.unwrap_or(CHUNKSIZE),
128            channel_size_modifier: channel_size_modifier.unwrap_or(CHANNEL_SIZE_MODIFIER),
129            pool,
130            processor,
131        }
132    }
133
134    /// Process each region.
135    ///
136    /// This method splits the sequences in the BAM/CRAM header into `chunksize` * `self.threads` regions (aka 'super chunks').
137    /// It then queries that 'super chunk' against the intervals (either the BED file, or the whole genome broken up into `chunksize`
138    /// regions). The results of that query are then processed by a pool of workers that apply `process_region` to reach interval to
139    /// do perbase analysis on. The collected result for each region is then sent back over the returned `Receiver<R::P>` channel
140    /// for the caller to use. The results will be returned in order according to the order of the intervals used to drive this method.
141    ///
142    /// While one 'super chunk' is being worked on by all workers, the last 'super chunks' results are being printed to either to
143    /// a file or to STDOUT, in order.
144    ///
145    /// Note, a common use case of this will be to fetch a region and do a pileup. The bounds of bases being looked at should still be
146    /// checked since a fetch will pull all reads that overlap the region in question.
147    pub fn process(self) -> Result<Receiver<R::P>> {
148        let channel_size: usize = ((BYTES_INA_GIGABYTE as f64 * self.channel_size_modifier).floor()
149            as usize
150            / std::mem::size_of::<R::P>())
151            * self.threads;
152        info!(
153            "Creating channel of length {:?} (* 120 bytes to get mem)",
154            channel_size
155        );
156        let (snd, rxv) = bounded(channel_size);
157        thread::spawn(move || {
158            self.pool.install(|| {
159                info!("Reading from {:?}", self.reads);
160                let mut reader = IndexedReader::from_path(&self.reads).expect("Indexed BAM/CRAM");
161                // If passed add ref_fasta
162                if let Some(ref_fasta) = &self.ref_fasta {
163                    reader.set_reference(ref_fasta).expect("Set ref");
164                }
165                // Get a copy of the header
166                let header = reader.header().to_owned();
167
168                // Work out if we are restricted to a subset of sites
169                let bed_intervals = if let Some(regions_bed) = &self.regions_bed {
170                    Some(
171                        Self::bed_to_intervals(&header, regions_bed, self.merge_regions)
172                            .expect("Parsed BED to intervals"),
173                    )
174                } else {
175                    None
176                };
177                let bcf_intervals = if let Some(regions_bcf) = &self.regions_bcf {
178                    Some(
179                        Self::bcf_to_intervals(&header, regions_bcf, self.merge_regions)
180                            .expect("Parsed BCF/VCF to intervals"),
181                    )
182                } else {
183                    None
184                };
185                let restricted_ivs = match (bed_intervals, bcf_intervals) {
186                    (Some(bed_ivs), Some(bcf_ivs)) => {
187                        Some(Self::merge_intervals(bed_ivs, bcf_ivs, self.merge_regions))
188                    }
189                    (Some(bed_ivs), None) => Some(bed_ivs),
190                    (None, Some(bcf_ivs)) => Some(bcf_ivs),
191                    (None, None) => None,
192                };
193
194                let intervals = if let Some(restricted) = restricted_ivs {
195                    restricted
196                } else {
197                    Self::header_to_intervals(&header, self.chunksize)
198                        .expect("Parsed BAM/CRAM header to intervals")
199                };
200
201                // The number positions to try to process in one batch
202                let serial_step_size = self.chunksize.saturating_mul(self.threads as u32); // aka superchunk
203                for (tid, intervals) in intervals.into_iter().enumerate() {
204                    let tid: u32 = tid as u32;
205                    let tid_end: u32 = header.target_len(tid).unwrap().try_into().unwrap();
206                    info!("Processing TID {}:0-{}", tid, tid_end);
207                    // Result holds the processed positions to be sent to writer
208                    let mut result = vec![];
209                    for chunk_start in (0..tid_end).step_by(serial_step_size as usize) {
210                        let tid_name = std::str::from_utf8(header.tid2name(tid)).unwrap();
211                        let chunk_end = std::cmp::min(chunk_start + serial_step_size, tid_end);
212                        trace!(
213                            "Batch Processing {}:{}-{}",
214                            tid_name, chunk_start, chunk_end
215                        );
216                        let (r, _) = rayon::join(
217                            || {
218                                // Must be a vec so that par_iter works and results stay in order
219                                let ivs: Vec<Interval<u32, ()>> =
220                                    Lapper::<u32, ()>::find(&intervals, chunk_start, chunk_end)
221                                        // Truncate intervals that extend forward or backward of chunk in question
222                                        .map(|iv| Interval {
223                                            start: std::cmp::max(iv.start, chunk_start),
224                                            stop: std::cmp::min(iv.stop, chunk_end),
225                                            val: (),
226                                        })
227                                        .collect();
228                                ivs.into_par_iter()
229                                    .flat_map(|iv| {
230                                        trace!("Processing {}:{}-{}", tid_name, iv.start, iv.stop);
231                                        self.processor.process_region(tid, iv.start, iv.stop)
232                                    })
233                                    .collect()
234                            },
235                            || {
236                                result.into_iter().for_each(|p| {
237                                    snd.send(p).expect("Sent a serializable to writer")
238                                })
239                            },
240                        );
241                        result = r;
242                    }
243                    // Send final set of results
244                    result
245                        .into_iter()
246                        .for_each(|p| snd.send(p).expect("Sent a serializable to writer"));
247                }
248            });
249        });
250        Ok(rxv)
251    }
252
253    // Convert the header into intervals of equally sized chunks. The last interval may be short.
254    fn header_to_intervals(header: &HeaderView, chunksize: u32) -> Result<Vec<Lapper<u32, ()>>> {
255        let mut intervals = vec![vec![]; header.target_count() as usize];
256        for tid in 0..(header.target_count()) {
257            let tid_len: u32 = header.target_len(tid).unwrap().try_into().unwrap();
258            for start in (0..tid_len).step_by(chunksize as usize) {
259                let stop = std::cmp::min(start + chunksize, tid_len);
260                intervals[tid as usize].push(Interval {
261                    start,
262                    stop,
263                    val: (),
264                });
265            }
266        }
267        Ok(intervals.into_iter().map(Lapper::new).collect())
268    }
269
270    /// Read a bed file into a vector of lappers with the index representing the TID
271    /// if `merge' is true then any overlapping intervals in the sets will be merged.
272    // TODO add a proper error message
273    fn bed_to_intervals(
274        header: &HeaderView,
275        bed_file: &PathBuf,
276        merge: bool,
277    ) -> Result<Vec<Lapper<u32, ()>>> {
278        let mut bed_reader = bed::Reader::from_file(bed_file)?;
279        let mut intervals = vec![vec![]; header.target_count() as usize];
280        for (i, record) in bed_reader.records().enumerate() {
281            let record = record?;
282            let tid = header
283                .tid(record.chrom().as_bytes())
284                .expect("Chromosome not found in BAM/CRAM header");
285            let start = record
286                .start()
287                .try_into()
288                .with_context(|| format!("BED record {} is invalid: unable to parse start", i))?;
289            let stop = record
290                .end()
291                .try_into()
292                .with_context(|| format!("BED record {} is invalid: unable to parse stop", i))?;
293            if stop < start {
294                return Err(anyhow!("BED record {} is invalid: stop < start", i));
295            }
296            intervals[tid as usize].push(Interval {
297                start,
298                stop,
299                val: (),
300            });
301        }
302
303        Ok(intervals
304            .into_iter()
305            .map(|ivs| {
306                let mut lapper = Lapper::new(ivs);
307                if merge {
308                    lapper.merge_overlaps();
309                }
310                lapper
311            })
312            .collect())
313    }
314
315    /// Read a BCF/VCF file into a vector of lappers with index representing the TID
316    /// if `merge' is true then any overlapping intervals in the sets will be merged.
317    fn bcf_to_intervals(
318        header: &HeaderView,
319        bcf_file: &PathBuf,
320        merge: bool,
321    ) -> Result<Vec<Lapper<u32, ()>>> {
322        let mut bcf_reader = Reader::from_path(bcf_file).expect("Error opening BCF/VCF file.");
323        let bcf_header_reader = Reader::from_path(bcf_file).expect("Error opening BCF/VCF file.");
324        let bcf_header = bcf_header_reader.header();
325        let mut intervals = vec![vec![]; header.target_count() as usize];
326        // TODO: validate the headers against eachother
327        for record in bcf_reader.records() {
328            let record = record?;
329            let record_rid = bcf_header.rid2name(record.rid().unwrap()).unwrap();
330            let tid = header
331                .tid(record_rid)
332                .expect("Chromosome not found in BAM/CRAM header");
333            let pos: u32 = record
334                .pos()
335                .try_into()
336                .expect("Got a negative value for pos");
337            intervals[tid as usize].push(Interval {
338                start: pos,
339                stop: pos + 1,
340                val: (),
341            });
342        }
343
344        Ok(intervals
345            .into_iter()
346            .map(|ivs| {
347                let mut lapper = Lapper::new(ivs);
348                if merge {
349                    lapper.merge_overlaps();
350                }
351                lapper
352            })
353            .collect())
354    }
355
356    /// Merge two sets of restriction intervals together
357    /// if `merge' is true then any overlapping intervals in the sets will be merged.
358    fn merge_intervals(
359        a_ivs: Vec<Lapper<u32, ()>>,
360        b_ivs: Vec<Lapper<u32, ()>>,
361        merge: bool,
362    ) -> Vec<Lapper<u32, ()>> {
363        let mut intervals = vec![vec![]; a_ivs.len()];
364        for (i, (a_lapper, b_lapper)) in a_ivs.into_iter().zip(b_ivs.into_iter()).enumerate() {
365            intervals[i] = a_lapper.into_iter().chain(b_lapper.into_iter()).collect();
366        }
367        intervals
368            .into_iter()
369            .map(|ivs| {
370                let mut lapper = Lapper::new(ivs);
371                if merge {
372                    lapper.merge_overlaps();
373                }
374                lapper
375            })
376            .collect()
377    }
378}
379
380#[cfg(test)]
381mod test {
382    use super::*;
383    use bio::io::bed;
384    use num_cpus;
385    use proptest::prelude::*;
386    use rust_htslib::{bam, bcf};
387    use rust_lapper::{Interval, Lapper};
388    use std::collections::{HashMap, HashSet};
389    use tempfile::tempdir;
390    // The purpose of these tests is to demonstrate that positions are covered once under a variety of circumstances
391
392    prop_compose! {
393        fn arb_iv_start(max_iv: u64)(start in 0..max_iv/2) -> u64 { start }
394    }
395    prop_compose! {
396        fn arb_iv_size(max_iv: u64)(size in 1..max_iv/2) -> u64 { size }
397    }
398    prop_compose! {
399        // Create an arbitrary interval where the min size == max_iv / 2
400        fn arb_iv(max_iv: u64)(start in arb_iv_start(max_iv), size in arb_iv_size(max_iv)) -> Interval<u64, ()> {
401            Interval {start, stop: start + size, val: ()}
402        }
403    }
404    // Create an arbitrary number of intervals along with the expected number of positions they cover
405    fn arb_ivs(
406        max_iv: u64,    // max iv size
407        max_ivs: usize, // max number of intervals
408    ) -> impl Strategy<Value = (Vec<Interval<u64, ()>>, u64, u64)> {
409        prop::collection::vec(arb_iv(max_iv), 0..max_ivs).prop_map(|vec| {
410            let mut furthest_right = 0;
411            let lapper = Lapper::new(vec.clone());
412            let expected = lapper.cov();
413            for iv in vec.iter() {
414                if iv.stop > furthest_right {
415                    furthest_right = iv.stop;
416                }
417            }
418            (vec, expected, furthest_right)
419        })
420    }
421    // Create arbitrary number of contigs with arbitrary intervals each
422    fn arb_chrs(
423        max_chr: usize, // number of chromosomes to use
424        max_iv: u64,    // max interval size
425        max_ivs: usize, // max number of intervals
426    ) -> impl Strategy<Value = Vec<(Vec<Interval<u64, ()>>, u64, u64)>> {
427        prop::collection::vec(arb_ivs(max_iv, max_ivs), 0..max_chr)
428    }
429    // An empty BAM with correct header
430    // A BED file with the randomly generated intervals (with expected number of positions)
431    // proptest generate random chunksize, cpus
432    proptest! {
433        #[test]
434        // add random chunksize and random cpus
435        // NB: using any larger numbers for this tends to blow up the test runtime
436        fn interval_set(chromosomes in arb_chrs(4, 10_000, 1_000), chunksize in any::<u32>(), cpus in 0..num_cpus::get(), use_bed in any::<bool>(), use_vcf in any::<bool>()) {
437            let tempdir = tempdir().unwrap();
438            let bam_path = tempdir.path().join("test.bam");
439            let bed_path = tempdir.path().join("test.bed");
440            let vcf_path = tempdir.path().join("test.vcf");
441
442            // Build a BAM
443            let mut header = bam::header::Header::new();
444            for (i,chr) in chromosomes.iter().enumerate() {
445                let mut chr_rec = bam::header::HeaderRecord::new(b"SQ");
446                chr_rec.push_tag(b"SN", &i.to_string());
447                chr_rec.push_tag(b"LN", &chr.2.to_string()); // set len as max observed
448                header.push_record(&chr_rec);
449            }
450            let writer = bam::Writer::from_path(&bam_path, &header, bam::Format::Bam).expect("Opened test.bam for writing");
451            drop(writer); // force flush the writer so the header info is written
452            bam::index::build(&bam_path, None, bam::index::Type::Bai, 1).unwrap();
453
454            // Build a bed
455            let mut writer = bed::Writer::to_file(&bed_path).expect("Opened test.bed for writing");
456            for (i, chr) in chromosomes.iter().enumerate() {
457                for iv in chr.0.iter() {
458                    let mut record = bed::Record::new();
459                    record.set_start(iv.start);
460                    record.set_end(iv.stop);
461                    record.set_chrom(&i.to_string());
462                    record.set_score(&0.to_string());
463                    writer.write(&record).expect("Wrote to test.bed");
464                }
465            }
466            drop(writer); // force flush
467
468            // Build a VCF file
469            let mut vcf_truth = HashMap::new();
470            let mut header = bcf::header::Header::new();
471            for (i,chr) in chromosomes.iter().enumerate() {
472                header.push_record(format!("##contig=<ID={},length={}>", &i.to_string(), &chr.2.to_string()).as_bytes());
473            }
474            let mut writer = bcf::Writer::from_path(&vcf_path, &header, true, bcf::Format::Vcf).expect("Failed to open test.vcf for writing");
475            let mut record = writer.empty_record();
476            for (i, chr) in chromosomes.iter().enumerate() {
477                record.set_rid(Some(i as u32));
478                let counter = vcf_truth.entry(i).or_insert(0);
479                let mut seen = HashSet::new();
480                for iv in chr.0.iter() {
481                    if !seen.contains(&iv.start) {
482                        *counter += 1;
483                        seen.insert(iv.start);
484                    }
485                    record.set_pos(iv.start as i64);
486                    writer.write(&record).expect("Failed to write to test.vcf")
487                }
488            }
489
490            drop(writer); // force flush
491            // Create the processor with a dumb impl of processing that just returns positions with no counting
492            let test_processor = TestProcessor {};
493            let par_granges_runner = ParGranges::new(
494                bam_path,
495                None,
496                if use_bed { Some(bed_path) } else { None }, // do one with regions
497                if use_vcf { Some(vcf_path) } else { None }, // do one with vcf regions
498                true,
499                Some(cpus),
500                Some(chunksize),
501                Some(0.002),
502                test_processor
503            );
504            let receiver = par_granges_runner.process().expect("Launch ParGranges Process");
505            let mut chrom_counts = HashMap::new();
506            receiver.into_iter().for_each(|p: PileupPosition| {
507                let positions = chrom_counts.entry(p.ref_seq.parse::<usize>().expect("parsed chr")).or_insert(0u64);
508                *positions += 1
509            });
510
511            // Validate that for each chr we get the expected number of bases
512            for (chrom, positions) in chrom_counts.iter() {
513                if use_bed  && !use_vcf {
514                    // if this was with bed, should be equal to .1
515                    prop_assert_eq!(chromosomes[*chrom].1, *positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].1, positions);
516                } else if use_bed && use_vcf {
517                    // if this was with bed, should be equal to .1, bed restrictions and vcf restrctions should overlap
518                    prop_assert_eq!(chromosomes[*chrom].1, *positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].1, positions);
519                } else if use_vcf && !use_bed {
520                    // total positions should be equal to the number of records for that chr in the vcf
521                    prop_assert_eq!(vcf_truth.get(chrom).unwrap(), positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].1, positions);
522                } else {
523                    // if this was bam only, should be equal to rightmost postion
524                    prop_assert_eq!(chromosomes[*chrom].2, *positions, "chr: {}, expected: {}, found: {}", chrom, chromosomes[*chrom].2, positions);
525                }
526            }
527
528        }
529    }
530
531    use crate::position::{Position, pileup_position::PileupPosition};
532    use smartstring::SmartString;
533    struct TestProcessor {}
534    impl RegionProcessor for TestProcessor {
535        type P = PileupPosition;
536
537        fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec<Self::P> {
538            let mut results = vec![];
539            for i in start..stop {
540                let chr = SmartString::from(&tid.to_string());
541                let pos = PileupPosition::new(chr, i);
542                results.push(pos);
543            }
544            results
545        }
546    }
547}