bed_utils/bed/
bed_trait.rs

1use bincode::{Decode, Encode};
2use itertools::Itertools;
3use num::Num;
4use num_traits::NumAssignOps;
5
6use super::BedGraph;
7use crate::{bed::{GenomicRange, Score, Strand}, extsort::{ExternalChunkError, ExternalSorterBuilder, SortError}};
8
9use std::{cmp::Ordering, iter::Sum, ops::Neg};
10
11/// Common BED fields
12pub trait BEDLike {
13    /// Return the chromosome name of the record
14    fn chrom(&self) -> &str;
15
16    /// Change the chromosome name of the record
17    fn set_chrom(&mut self, chrom: &str) -> &mut Self;
18
19    /// Return the 0-based start position of the record
20    fn start(&self) -> u64;
21
22    /// Change the 0-based start position of the record
23    fn set_start(&mut self, start: u64) -> &mut Self;
24
25    /// Return the end position (non-inclusive) of the record
26    fn end(&self) -> u64;
27
28    /// Change the end position (non-inclusive) of the record
29    fn set_end(&mut self, end: u64) -> &mut Self;
30
31    /// Return the name of the record
32    fn name(&self) -> Option<&str> {
33        None
34    }
35
36    /// Return the score of the record
37    fn score(&self) -> Option<Score> {
38        None
39    }
40
41    /// Return the strand of the record
42    fn strand(&self) -> Option<Strand> {
43        None
44    }
45
46    /// Return the length of the record. Return 0 if the end position is smaller
47    /// than the start position.
48    fn len(&self) -> u64 {
49        self.end().saturating_sub(self.start())
50    }
51
52    fn compare(&self, other: &Self) -> Ordering {
53        self.chrom()
54            .cmp(other.chrom())
55            .then(self.start().cmp(&other.start()))
56            .then(self.end().cmp(&other.end()))
57    }
58
59    /// Return the overlap
60    fn overlap<B: BEDLike>(&self, other: &B) -> Option<GenomicRange> {
61        if self.chrom() != other.chrom() {
62            None
63        } else {
64            let start = self.start().max(other.start());
65            let end = self.end().min(other.end());
66            if start >= end {
67                None
68            } else {
69                Some(GenomicRange::new(self.chrom(), start, end))
70            }
71        }
72    }
73
74    /// Return the size of overlap between two records
75    fn n_overlap<B: BEDLike>(&self, other: &B) -> u64 {
76        self.overlap(other).map_or(0, |x| x.len())
77    }
78
79    /// Convert the record to a `GenomicRange`
80    fn to_genomic_range(&self) -> GenomicRange {
81        GenomicRange::new(self.chrom(), self.start(), self.end())
82    }
83
84    /// Split into consecutive records with the specified length. The length of
85    /// the last record may be shorter.
86    fn split_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
87        let start = self.start();
88        let end = self.end();
89        (start..end)
90            .step_by(bin_size as usize)
91            .map(move |x| GenomicRange::new(self.chrom(), x, (x + bin_size).min(end)))
92    }
93
94    /// Split into consecutive records with the specified length starting from the end.
95    /// The result is in reverse order compared to `split_by_len`. The length of the last
96    /// record may be shorter.
97    fn rsplit_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
98        let start = self.start();
99        let end = self.end();
100        (start + 1..=end)
101            .rev()
102            .step_by(bin_size as usize)
103            .map(move |x| GenomicRange::new(self.chrom(), x.saturating_sub(bin_size).max(start), x))
104    }
105}
106
107pub trait MergeBed: Iterator {
108    /// Merge sorted BED records. Overlapping records are processed according to the
109    /// function provided.
110    ///
111    /// # Arguments
112    /// * `merger` - A function that takes a vector of BED records and returns an output type.
113    ///
114    /// # Returns
115    /// An iterator that yields the output type produced by the merger function.
116    fn merge_sorted_bed_with<B, O, F>(self, merger: F) -> impl Iterator<Item = O>
117    where
118        Self: Iterator<Item = B> + Sized,
119        B: BEDLike,
120        F: Fn(Vec<B>) -> O,
121    {
122        MergeBedOutput {
123            sorted_bed_iter: self,
124            merger,
125            accum: None,
126        }
127    }
128
129    /// Merge sorted BED records. Overlapping records are concatenated into a single
130    /// record.
131    fn merge_sorted_bed<B>(self) -> impl Iterator<Item = GenomicRange>
132    where
133        Self: Iterator<Item = B> + Sized,
134        B: BEDLike,
135    {
136        self.merge_sorted_bed_with(|x| {
137            GenomicRange::new(
138                x[0].chrom(),
139                x.iter().map(|x| x.start()).min().unwrap(),
140                x.iter().map(|x| x.end()).max().unwrap(),
141            )
142        })
143    }
144
145    fn merge_sorted_bedgraph<V>(self) -> impl Iterator<Item = BedGraph<V>>
146    where
147        Self: Iterator<Item = BedGraph<V>> + Sized,
148        V: Num + NumAssignOps + Sum + Neg<Output = V> + PartialOrd + Copy,
149    {
150        self.merge_sorted_bed_with(|bdgs| {
151            // group by start and end points
152            let point_groups = bdgs
153                .iter()
154                .flat_map(|bedgraph| {
155                    [
156                        (bedgraph.start(), bedgraph.value),
157                        (bedgraph.end(), bedgraph.value.neg()),
158                    ]
159                })
160                .sorted_unstable_by_key(|x| x.0)
161                .chunk_by(|x| (x.0, x.1 < V::zero()));
162            let mut point_groups = point_groups.into_iter();
163
164            let chrom = bdgs[0].chrom();
165            let ((mut prev_pos, _), first_group) = point_groups.next().unwrap();
166            let mut acc_val = first_group.into_iter().map(|x| x.1).sum();
167            let mut prev_bedgraph = BedGraph::new(chrom, prev_pos, prev_pos, acc_val);
168
169            let mut result = point_groups
170                .flat_map(|((pos, _), group)| {
171                    let value_sum = group.into_iter().map(|x| x.1).sum();
172                    let mut bedgraph = None;
173
174                    if prev_pos != pos {
175                        if acc_val == prev_bedgraph.value {
176                            prev_bedgraph.set_end(pos);
177                        } else {
178                            bedgraph = Some(prev_bedgraph.clone());
179                            prev_bedgraph = BedGraph::new(chrom, prev_pos, pos, acc_val);
180                        }
181                    }
182
183                    acc_val += value_sum;
184                    prev_pos = pos;
185                    bedgraph
186                })
187                .collect::<Vec<_>>();
188            result.push(prev_bedgraph);
189            result
190        })
191        .flatten()
192    }
193}
194
195impl<T> MergeBed for T where T: Iterator + ?Sized {}
196
197struct MergeBedOutput<I, B, F> {
198    sorted_bed_iter: I,
199    merger: F,
200    accum: Option<((String, u64, u64), Vec<B>)>,
201}
202
203impl<I, F, B, O> Iterator for MergeBedOutput<I, B, F>
204where
205    I: Iterator<Item = B>,
206    B: BEDLike,
207    F: Fn(Vec<B>) -> O,
208{
209    type Item = O;
210    fn next(&mut self) -> Option<Self::Item> {
211        loop {
212            match self.sorted_bed_iter.next() {
213                None => {
214                    return if self.accum.is_none() {
215                        None
216                    } else {
217                        let (_, accum) = std::mem::replace(&mut self.accum, None).unwrap();
218                        Some((self.merger)(accum))
219                    }
220                }
221                Some(record) => match self.accum.as_mut() {
222                    None => {
223                        self.accum = Some((
224                            (record.chrom().to_string(), record.start(), record.end()),
225                            vec![record],
226                        ))
227                    }
228                    Some(((chr, s, e), accum)) => {
229                        let chr_ = record.chrom();
230                        let s_ = record.start();
231                        let e_ = record.end();
232                        if chr != chr_ || s_ > *e {
233                            let (_, acc) = std::mem::replace(
234                                &mut self.accum,
235                                Some(((chr_.to_string(), s_, e_), vec![record])),
236                            )
237                            .unwrap();
238                            return Some((self.merger)(acc));
239                        } else if s_ < *s {
240                            panic!("input is not sorted")
241                        } else if e_ > *e {
242                            *e = e_;
243                            accum.push(record);
244                        } else {
245                            accum.push(record);
246                        }
247                    }
248                },
249            }
250        }
251    }
252}
253
254#[derive(Debug, Clone)]
255pub struct SortBedOptions {
256    pub num_threads: Option<usize>,
257    pub chunk_size: usize,
258    pub compression: Option<u8>,
259    pub tmp_dir: Option<String>,
260}
261
262impl Default for SortBedOptions {
263    fn default() -> Self {
264        SortBedOptions {
265            num_threads: None,
266            chunk_size: 50000000,
267            compression: Some(1),
268            tmp_dir: None,
269        }
270    }
271}
272
273pub trait SortBed: Iterator {
274    /// Sort the BED records in the iterator.
275    fn sort_bed<B>(self) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
276    where
277        Self: Iterator<Item = B> + Sized,
278        B: BEDLike + Encode + Decode<()> + Send,
279    {
280        self.sort_bed_with_options(SortBedOptions::default())
281    }
282
283    /// Sort the BED records in the iterator.
284    fn sort_bed_with_options<B>(self, opts: SortBedOptions) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
285    where
286        Self: Iterator<Item = B> + Sized,
287        B: BEDLike + Encode + Decode<()> + Send,
288    {
289        let mut sorter = ExternalSorterBuilder::new()
290            .with_chunk_size(opts.chunk_size);
291        if let Some(num_threads) = opts.num_threads {
292            sorter = sorter.num_threads(num_threads);
293        }
294        if let Some(compression) = opts.compression {
295            sorter = sorter.with_compression(compression as u32);
296        }
297        if let Some(tmp_dir) = opts.tmp_dir {
298            sorter = sorter.with_tmp_dir(tmp_dir);
299        }
300        sorter.build().unwrap().sort_by(self, |x: &B, y: &B| x.compare(y))
301    }
302}
303
304impl<T> SortBed for T where T: Iterator + ?Sized {}
305
306#[cfg(test)]
307mod bed_tests {
308    use super::*;
309    use crate::extsort::ExternalSorterBuilder;
310    use itertools::Itertools;
311    use rand::Rng;
312
313    fn rand_bed(n: usize) -> Vec<GenomicRange> {
314        let mut rng = rand::thread_rng();
315        let mut result = Vec::with_capacity(n);
316        for _ in 0..n {
317            let bed = GenomicRange::new(
318                format!("chr{}", rng.gen_range(1..20)),
319                rng.gen_range(0..10000),
320                rng.gen_range(1000000..2000000),
321            );
322            result.push(bed);
323        }
324        result
325    }
326
327    #[test]
328    fn test_sort() {
329        let data1 = vec![
330            GenomicRange::new("chr1", 1020, 1230),
331            GenomicRange::new("chr1", 0, 500),
332            GenomicRange::new("chr2", 500, 1000),
333            GenomicRange::new("chr1", 500, 1000),
334            GenomicRange::new("chr1", 1000, 1230),
335            GenomicRange::new("chr1", 1000, 1230),
336        ];
337        let data2 = rand_bed(100000);
338
339        [data1, data2].into_iter().for_each(|mut data| {
340            let sorted1 = data.clone().into_iter()
341                .sort_bed()
342                .unwrap()
343                .collect::<Result<Vec<_>, _>>()
344                .unwrap();
345            let sorted2 = ExternalSorterBuilder::new()
346                .with_chunk_size(1)
347                .with_compression(4)
348                .build()
349                .unwrap()
350                .sort(data.clone().into_iter())
351                .unwrap()
352                .collect::<Result<Vec<_>, _>>()
353                .unwrap();
354            data.sort();
355            assert_eq!(data, sorted1);
356            assert_eq!(data, sorted2);
357        })
358    }
359
360    #[test]
361    fn test_split() {
362        assert_eq!(
363            GenomicRange::new("chr1", 0, 1230)
364                .split_by_len(500)
365                .collect::<Vec<_>>(),
366            vec![
367                GenomicRange::new("chr1", 0, 500),
368                GenomicRange::new("chr1", 500, 1000),
369                GenomicRange::new("chr1", 1000, 1230),
370            ],
371        );
372        assert_eq!(
373            GenomicRange::new("chr1", 0, 1230)
374                .rsplit_by_len(500)
375                .collect::<Vec<_>>(),
376            vec![
377                GenomicRange::new("chr1", 730, 1230),
378                GenomicRange::new("chr1", 230, 730),
379                GenomicRange::new("chr1", 0, 230),
380            ],
381        );
382        assert_eq!(
383            GenomicRange::new("chr1", 500, 2500)
384                .split_by_len(500)
385                .collect::<Vec<_>>(),
386            GenomicRange::new("chr1", 500, 2500)
387                .rsplit_by_len(500)
388                .sorted()
389                .collect::<Vec<_>>(),
390        );
391        assert_eq!(
392            GenomicRange::new("chr1", 500, 2500)
393                .split_by_len(1)
394                .collect::<Vec<_>>(),
395            GenomicRange::new("chr1", 500, 2500)
396                .rsplit_by_len(1)
397                .sorted()
398                .collect::<Vec<_>>(),
399        );
400    }
401
402    #[test]
403    fn test_overlap() {
404        assert_eq!(
405            GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 150, 300)),
406            Some(GenomicRange::new("chr1", 150, 200)),
407        );
408
409        assert_eq!(
410            GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
411            Some(GenomicRange::new("chr1", 110, 190)),
412        );
413
414        assert_eq!(
415            GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 90, 99)),
416            None,
417        );
418
419        assert_eq!(
420            GenomicRange::new("chr1", 111, 180).overlap(&GenomicRange::new("chr1", 110, 190)),
421            Some(GenomicRange::new("chr1", 111, 180)),
422        );
423
424        assert_eq!(
425            GenomicRange::new("chr1", 111, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
426            Some(GenomicRange::new("chr1", 111, 190)),
427        );
428    }
429
430    #[test]
431    fn test_merge() {
432        let input = [
433            (0, 100),
434            (10, 20),
435            (50, 150),
436            (120, 160),
437            (155, 200),
438            (155, 220),
439            (500, 1000),
440            (2000, 2100),
441            (2100, 2200),
442        ]
443        .into_iter()
444        .map(|(a, b)| std::io::Result::Ok(GenomicRange::new("chr1", a, b)));
445        let expect: Vec<GenomicRange> = [(0, 220), (500, 1000), (2000, 2200)]
446            .into_iter()
447            .map(|(a, b)| GenomicRange::new("chr1", a, b))
448            .collect();
449        assert_eq!(
450            input
451                .clone()
452                .map(|x| x.unwrap())
453                .merge_sorted_bed()
454                .collect::<Vec<_>>(),
455            expect
456        );
457    }
458
459    #[test]
460    fn test_merge_bedgraph() {
461        let reads = vec![
462            BedGraph::new("chr1", 0, 100, 1),
463            BedGraph::new("chr1", 0, 100, 1),
464            BedGraph::new("chr1", 2, 100, 2),
465            BedGraph::new("chr1", 30, 60, 3),
466            BedGraph::new("chr1", 40, 50, 4),
467            BedGraph::new("chr1", 60, 65, 5),
468        ];
469
470        let actual: Vec<_> = reads.into_iter().merge_sorted_bedgraph().collect();
471        let expected = vec![
472            BedGraph::new("chr1", 0, 2, 2),
473            BedGraph::new("chr1", 2, 30, 4),
474            BedGraph::new("chr1", 30, 40, 7),
475            BedGraph::new("chr1", 40, 50, 11),
476            BedGraph::new("chr1", 50, 60, 7),
477            BedGraph::new("chr1", 60, 65, 9),
478            BedGraph::new("chr1", 65, 100, 4),
479        ];
480        assert_eq!(expected, actual);
481    }
482}