Skip to main content

bed_utils/bed/
bed_trait.rs

1use itertools::Itertools;
2use num::Num;
3use num_traits::NumAssignOps;
4use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
5
6use super::BedGraph;
7use crate::{
8    bed::{GenomicRange, Score, Strand},
9    extsort::{
10        DiskDeserializer, DiskSerializer, ExternalChunkError, ExternalSorterBuilder, SortError,
11    },
12};
13
14use std::{cmp::Ordering, iter::Sum, ops::Neg};
15
16/// Common BED fields
17pub trait BEDLike {
18    /// Return the chromosome name of the record
19    fn chrom(&self) -> &str;
20
21    /// Change the chromosome name of the record
22    fn set_chrom(&mut self, chrom: &str) -> &mut Self;
23
24    /// Return the 0-based start position of the record
25    fn start(&self) -> u64;
26
27    /// Change the 0-based start position of the record
28    fn set_start(&mut self, start: u64) -> &mut Self;
29
30    /// Return the end position (non-inclusive) of the record
31    fn end(&self) -> u64;
32
33    /// Change the end position (non-inclusive) of the record
34    fn set_end(&mut self, end: u64) -> &mut Self;
35
36    /// Return the name of the record
37    fn name(&self) -> Option<&str> {
38        None
39    }
40
41    /// Return the score of the record
42    fn score(&self) -> Option<Score> {
43        None
44    }
45
46    /// Return the strand of the record
47    fn strand(&self) -> Option<Strand> {
48        None
49    }
50
51    /// Return the length of the record. Return 0 if the end position is smaller
52    /// than the start position.
53    fn len(&self) -> u64 {
54        self.end().saturating_sub(self.start())
55    }
56
57    fn compare(&self, other: &Self) -> Ordering {
58        self.chrom()
59            .cmp(other.chrom())
60            .then(self.start().cmp(&other.start()))
61            .then(self.end().cmp(&other.end()))
62    }
63
64    /// Return the overlap
65    fn overlap<B: BEDLike>(&self, other: &B) -> Option<GenomicRange> {
66        if self.chrom() != other.chrom() {
67            None
68        } else {
69            let start = self.start().max(other.start());
70            let end = self.end().min(other.end());
71            if start >= end {
72                None
73            } else {
74                Some(GenomicRange::new(self.chrom(), start, end))
75            }
76        }
77    }
78
79    /// Return the size of overlap between two records
80    fn n_overlap<B: BEDLike>(&self, other: &B) -> u64 {
81        self.overlap(other).map_or(0, |x| x.len())
82    }
83
84    /// Convert the record to a `GenomicRange`
85    fn to_genomic_range(&self) -> GenomicRange {
86        GenomicRange::new(self.chrom(), self.start(), self.end())
87    }
88
89    /// Split into consecutive records with the specified length. The length of
90    /// the last record may be shorter.
91    fn split_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
92        let start = self.start();
93        let end = self.end();
94        (start..end)
95            .step_by(bin_size as usize)
96            .map(move |x| GenomicRange::new(self.chrom(), x, (x + bin_size).min(end)))
97    }
98
99    /// Split into consecutive records with the specified length starting from the end.
100    /// The result is in reverse order compared to `split_by_len`. The length of the last
101    /// record may be shorter.
102    fn rsplit_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
103        let start = self.start();
104        let end = self.end();
105        (start + 1..=end)
106            .rev()
107            .step_by(bin_size as usize)
108            .map(move |x| GenomicRange::new(self.chrom(), x.saturating_sub(bin_size).max(start), x))
109    }
110}
111
112pub trait MergeBed: Iterator {
113    /// Merge sorted BED records. Overlapping records are processed according to the
114    /// function provided.
115    ///
116    /// # Arguments
117    /// * `merger` - A function that takes a vector of BED records and returns an output type.
118    ///
119    /// # Returns
120    /// An iterator that yields the output type produced by the merger function.
121    fn merge_sorted_bed_with<B, O, F>(self, merger: F) -> impl Iterator<Item = O>
122    where
123        Self: Iterator<Item = B> + Sized,
124        B: BEDLike,
125        F: Fn(Vec<B>) -> O,
126    {
127        MergeBedOutput {
128            sorted_bed_iter: self,
129            merger,
130            accum: None,
131        }
132    }
133
134    /// Merge sorted BED records. Overlapping records are concatenated into a single
135    /// record.
136    fn merge_sorted_bed<B>(self) -> impl Iterator<Item = GenomicRange>
137    where
138        Self: Iterator<Item = B> + Sized,
139        B: BEDLike,
140    {
141        self.merge_sorted_bed_with(|x| {
142            GenomicRange::new(
143                x[0].chrom(),
144                x.iter().map(|x| x.start()).min().unwrap(),
145                x.iter().map(|x| x.end()).max().unwrap(),
146            )
147        })
148    }
149
150    fn merge_sorted_bedgraph<V>(self) -> impl Iterator<Item = BedGraph<V>>
151    where
152        Self: Iterator<Item = BedGraph<V>> + Sized,
153        V: Num + NumAssignOps + Sum + Neg<Output = V> + PartialOrd + Copy,
154    {
155        self.merge_sorted_bed_with(|bdgs| {
156            // group by start and end points
157            let point_groups = bdgs
158                .iter()
159                .flat_map(|bedgraph| {
160                    [
161                        (bedgraph.start(), bedgraph.value),
162                        (bedgraph.end(), bedgraph.value.neg()),
163                    ]
164                })
165                .sorted_unstable_by_key(|x| x.0)
166                .chunk_by(|x| (x.0, x.1 < V::zero()));
167            let mut point_groups = point_groups.into_iter();
168
169            let chrom = bdgs[0].chrom();
170            let ((mut prev_pos, _), first_group) = point_groups.next().unwrap();
171            let mut acc_val = first_group.into_iter().map(|x| x.1).sum();
172            let mut prev_bedgraph = BedGraph::new(chrom, prev_pos, prev_pos, acc_val);
173
174            let mut result = point_groups
175                .flat_map(|((pos, _), group)| {
176                    let value_sum = group.into_iter().map(|x| x.1).sum();
177                    let mut bedgraph = None;
178
179                    if prev_pos != pos {
180                        if acc_val == prev_bedgraph.value {
181                            prev_bedgraph.set_end(pos);
182                        } else {
183                            bedgraph = Some(prev_bedgraph.clone());
184                            prev_bedgraph = BedGraph::new(chrom, prev_pos, pos, acc_val);
185                        }
186                    }
187
188                    acc_val += value_sum;
189                    prev_pos = pos;
190                    bedgraph
191                })
192                .collect::<Vec<_>>();
193            result.push(prev_bedgraph);
194            result
195        })
196        .flatten()
197    }
198}
199
200impl<T> MergeBed for T where T: Iterator + ?Sized {}
201
202struct MergeBedOutput<I, B, F> {
203    sorted_bed_iter: I,
204    merger: F,
205    accum: Option<((String, u64, u64), Vec<B>)>,
206}
207
208impl<I, F, B, O> Iterator for MergeBedOutput<I, B, F>
209where
210    I: Iterator<Item = B>,
211    B: BEDLike,
212    F: Fn(Vec<B>) -> O,
213{
214    type Item = O;
215    fn next(&mut self) -> Option<Self::Item> {
216        loop {
217            match self.sorted_bed_iter.next() {
218                None => {
219                    return if self.accum.is_none() {
220                        None
221                    } else {
222                        let (_, accum) = std::mem::replace(&mut self.accum, None).unwrap();
223                        Some((self.merger)(accum))
224                    }
225                }
226                Some(record) => match self.accum.as_mut() {
227                    None => {
228                        self.accum = Some((
229                            (record.chrom().to_string(), record.start(), record.end()),
230                            vec![record],
231                        ))
232                    }
233                    Some(((chr, s, e), accum)) => {
234                        let chr_ = record.chrom();
235                        let s_ = record.start();
236                        let e_ = record.end();
237                        if chr != chr_ || s_ > *e {
238                            let (_, acc) = std::mem::replace(
239                                &mut self.accum,
240                                Some(((chr_.to_string(), s_, e_), vec![record])),
241                            )
242                            .unwrap();
243                            return Some((self.merger)(acc));
244                        } else if s_ < *s {
245                            panic!("input is not sorted")
246                        } else if e_ > *e {
247                            *e = e_;
248                            accum.push(record);
249                        } else {
250                            accum.push(record);
251                        }
252                    }
253                },
254            }
255        }
256    }
257}
258
259#[derive(Debug, Clone)]
260pub struct SortBedOptions {
261    pub num_threads: Option<usize>,
262    pub chunk_size: usize,
263    pub compression: Option<u8>,
264    pub tmp_dir: Option<String>,
265}
266
267impl Default for SortBedOptions {
268    fn default() -> Self {
269        SortBedOptions {
270            num_threads: None,
271            chunk_size: 50000000,
272            compression: Some(1),
273            tmp_dir: None,
274        }
275    }
276}
277
278pub trait SortBed: Iterator {
279    /// Sort the BED records in the iterator.
280    fn sort_bed<B>(
281        self,
282    ) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
283    where
284        Self: Iterator<Item = B> + Sized,
285        B: BEDLike + Archive + for<'a> RkyvSerialize<DiskSerializer<'a>> + Send,
286        B::Archived: RkyvDeserialize<B, DiskDeserializer>,
287    {
288        self.sort_bed_with_options(SortBedOptions::default())
289    }
290
291    /// Sort the BED records in the iterator.
292    fn sort_bed_with_options<B>(
293        self,
294        opts: SortBedOptions,
295    ) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
296    where
297        Self: Iterator<Item = B> + Sized,
298        B: BEDLike + Archive + for<'a> RkyvSerialize<DiskSerializer<'a>> + Send,
299        B::Archived: RkyvDeserialize<B, DiskDeserializer>,
300    {
301        let mut sorter = ExternalSorterBuilder::new().with_chunk_size(opts.chunk_size);
302        if let Some(num_threads) = opts.num_threads {
303            sorter = sorter.num_threads(num_threads);
304        }
305        if let Some(compression) = opts.compression {
306            sorter = sorter.with_compression(compression as u32);
307        }
308        if let Some(tmp_dir) = opts.tmp_dir {
309            sorter = sorter.with_tmp_dir(tmp_dir);
310        }
311        sorter
312            .build()
313            .unwrap()
314            .sort_by(self, |x: &B, y: &B| x.compare(y))
315    }
316}
317
318impl<T> SortBed for T where T: Iterator + ?Sized {}
319
320#[cfg(test)]
321mod bed_tests {
322    use super::*;
323    use crate::extsort::ExternalSorterBuilder;
324    use itertools::Itertools;
325    use rand::Rng;
326
327    fn rand_bed(n: usize) -> Vec<GenomicRange> {
328        let mut rng = rand::thread_rng();
329        let mut result = Vec::with_capacity(n);
330        for _ in 0..n {
331            let bed = GenomicRange::new(
332                format!("chr{}", rng.gen_range(1..20)),
333                rng.gen_range(0..10000),
334                rng.gen_range(1000000..2000000),
335            );
336            result.push(bed);
337        }
338        result
339    }
340
341    #[test]
342    fn test_sort() {
343        let data1 = vec![
344            GenomicRange::new("chr1", 1020, 1230),
345            GenomicRange::new("chr1", 0, 500),
346            GenomicRange::new("chr2", 500, 1000),
347            GenomicRange::new("chr1", 500, 1000),
348            GenomicRange::new("chr1", 1000, 1230),
349            GenomicRange::new("chr1", 1000, 1230),
350        ];
351        let data2 = rand_bed(100000);
352
353        [data1, data2].into_iter().for_each(|mut data| {
354            let sorted1 = data
355                .clone()
356                .into_iter()
357                .sort_bed()
358                .unwrap()
359                .collect::<Result<Vec<_>, _>>()
360                .unwrap();
361            let sorted2 = ExternalSorterBuilder::new()
362                .with_chunk_size(1)
363                .with_compression(4)
364                .build()
365                .unwrap()
366                .sort(data.clone().into_iter())
367                .unwrap()
368                .collect::<Result<Vec<_>, _>>()
369                .unwrap();
370            data.sort();
371            assert_eq!(data, sorted1);
372            assert_eq!(data, sorted2);
373        })
374    }
375
376    #[test]
377    fn test_split() {
378        assert_eq!(
379            GenomicRange::new("chr1", 0, 1230)
380                .split_by_len(500)
381                .collect::<Vec<_>>(),
382            vec![
383                GenomicRange::new("chr1", 0, 500),
384                GenomicRange::new("chr1", 500, 1000),
385                GenomicRange::new("chr1", 1000, 1230),
386            ],
387        );
388        assert_eq!(
389            GenomicRange::new("chr1", 0, 1230)
390                .rsplit_by_len(500)
391                .collect::<Vec<_>>(),
392            vec![
393                GenomicRange::new("chr1", 730, 1230),
394                GenomicRange::new("chr1", 230, 730),
395                GenomicRange::new("chr1", 0, 230),
396            ],
397        );
398        assert_eq!(
399            GenomicRange::new("chr1", 500, 2500)
400                .split_by_len(500)
401                .collect::<Vec<_>>(),
402            GenomicRange::new("chr1", 500, 2500)
403                .rsplit_by_len(500)
404                .sorted()
405                .collect::<Vec<_>>(),
406        );
407        assert_eq!(
408            GenomicRange::new("chr1", 500, 2500)
409                .split_by_len(1)
410                .collect::<Vec<_>>(),
411            GenomicRange::new("chr1", 500, 2500)
412                .rsplit_by_len(1)
413                .sorted()
414                .collect::<Vec<_>>(),
415        );
416    }
417
418    #[test]
419    fn test_overlap() {
420        assert_eq!(
421            GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 150, 300)),
422            Some(GenomicRange::new("chr1", 150, 200)),
423        );
424
425        assert_eq!(
426            GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
427            Some(GenomicRange::new("chr1", 110, 190)),
428        );
429
430        assert_eq!(
431            GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 90, 99)),
432            None,
433        );
434
435        assert_eq!(
436            GenomicRange::new("chr1", 111, 180).overlap(&GenomicRange::new("chr1", 110, 190)),
437            Some(GenomicRange::new("chr1", 111, 180)),
438        );
439
440        assert_eq!(
441            GenomicRange::new("chr1", 111, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
442            Some(GenomicRange::new("chr1", 111, 190)),
443        );
444    }
445
446    #[test]
447    fn test_merge() {
448        let input = [
449            (0, 100),
450            (10, 20),
451            (50, 150),
452            (120, 160),
453            (155, 200),
454            (155, 220),
455            (500, 1000),
456            (2000, 2100),
457            (2100, 2200),
458        ]
459        .into_iter()
460        .map(|(a, b)| std::io::Result::Ok(GenomicRange::new("chr1", a, b)));
461        let expect: Vec<GenomicRange> = [(0, 220), (500, 1000), (2000, 2200)]
462            .into_iter()
463            .map(|(a, b)| GenomicRange::new("chr1", a, b))
464            .collect();
465        assert_eq!(
466            input
467                .clone()
468                .map(|x| x.unwrap())
469                .merge_sorted_bed()
470                .collect::<Vec<_>>(),
471            expect
472        );
473    }
474
475    #[test]
476    fn test_merge_bedgraph() {
477        let reads = vec![
478            BedGraph::new("chr1", 0, 100, 1),
479            BedGraph::new("chr1", 0, 100, 1),
480            BedGraph::new("chr1", 2, 100, 2),
481            BedGraph::new("chr1", 30, 60, 3),
482            BedGraph::new("chr1", 40, 50, 4),
483            BedGraph::new("chr1", 60, 65, 5),
484        ];
485
486        let actual: Vec<_> = reads.into_iter().merge_sorted_bedgraph().collect();
487        let expected = vec![
488            BedGraph::new("chr1", 0, 2, 2),
489            BedGraph::new("chr1", 2, 30, 4),
490            BedGraph::new("chr1", 30, 40, 7),
491            BedGraph::new("chr1", 40, 50, 11),
492            BedGraph::new("chr1", 50, 60, 7),
493            BedGraph::new("chr1", 60, 65, 9),
494            BedGraph::new("chr1", 65, 100, 4),
495        ];
496        assert_eq!(expected, actual);
497    }
498}