1use bincode::{Decode, Encode};
2use itertools::Itertools;
3use num::Num;
4use num_traits::NumAssignOps;
5
6use super::BedGraph;
7use crate::{bed::{GenomicRange, Score, Strand}, extsort::{ExternalChunkError, ExternalSorterBuilder, SortError}};
8
9use std::{cmp::Ordering, iter::Sum, ops::Neg};
10
11pub trait BEDLike {
13 fn chrom(&self) -> &str;
15
16 fn set_chrom(&mut self, chrom: &str) -> &mut Self;
18
19 fn start(&self) -> u64;
21
22 fn set_start(&mut self, start: u64) -> &mut Self;
24
25 fn end(&self) -> u64;
27
28 fn set_end(&mut self, end: u64) -> &mut Self;
30
31 fn name(&self) -> Option<&str> {
33 None
34 }
35
36 fn score(&self) -> Option<Score> {
38 None
39 }
40
41 fn strand(&self) -> Option<Strand> {
43 None
44 }
45
46 fn len(&self) -> u64 {
49 self.end().saturating_sub(self.start())
50 }
51
52 fn compare(&self, other: &Self) -> Ordering {
53 self.chrom()
54 .cmp(other.chrom())
55 .then(self.start().cmp(&other.start()))
56 .then(self.end().cmp(&other.end()))
57 }
58
59 fn overlap<B: BEDLike>(&self, other: &B) -> Option<GenomicRange> {
61 if self.chrom() != other.chrom() {
62 None
63 } else {
64 let start = self.start().max(other.start());
65 let end = self.end().min(other.end());
66 if start >= end {
67 None
68 } else {
69 Some(GenomicRange::new(self.chrom(), start, end))
70 }
71 }
72 }
73
74 fn n_overlap<B: BEDLike>(&self, other: &B) -> u64 {
76 self.overlap(other).map_or(0, |x| x.len())
77 }
78
79 fn to_genomic_range(&self) -> GenomicRange {
81 GenomicRange::new(self.chrom(), self.start(), self.end())
82 }
83
84 fn split_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
87 let start = self.start();
88 let end = self.end();
89 (start..end)
90 .step_by(bin_size as usize)
91 .map(move |x| GenomicRange::new(self.chrom(), x, (x + bin_size).min(end)))
92 }
93
94 fn rsplit_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
98 let start = self.start();
99 let end = self.end();
100 (start + 1..=end)
101 .rev()
102 .step_by(bin_size as usize)
103 .map(move |x| GenomicRange::new(self.chrom(), x.saturating_sub(bin_size).max(start), x))
104 }
105}
106
107pub trait MergeBed: Iterator {
108 fn merge_sorted_bed_with<B, O, F>(self, merger: F) -> impl Iterator<Item = O>
117 where
118 Self: Iterator<Item = B> + Sized,
119 B: BEDLike,
120 F: Fn(Vec<B>) -> O,
121 {
122 MergeBedOutput {
123 sorted_bed_iter: self,
124 merger,
125 accum: None,
126 }
127 }
128
129 fn merge_sorted_bed<B>(self) -> impl Iterator<Item = GenomicRange>
132 where
133 Self: Iterator<Item = B> + Sized,
134 B: BEDLike,
135 {
136 self.merge_sorted_bed_with(|x| {
137 GenomicRange::new(
138 x[0].chrom(),
139 x.iter().map(|x| x.start()).min().unwrap(),
140 x.iter().map(|x| x.end()).max().unwrap(),
141 )
142 })
143 }
144
145 fn merge_sorted_bedgraph<V>(self) -> impl Iterator<Item = BedGraph<V>>
146 where
147 Self: Iterator<Item = BedGraph<V>> + Sized,
148 V: Num + NumAssignOps + Sum + Neg<Output = V> + PartialOrd + Copy,
149 {
150 self.merge_sorted_bed_with(|bdgs| {
151 let point_groups = bdgs
153 .iter()
154 .flat_map(|bedgraph| {
155 [
156 (bedgraph.start(), bedgraph.value),
157 (bedgraph.end(), bedgraph.value.neg()),
158 ]
159 })
160 .sorted_unstable_by_key(|x| x.0)
161 .chunk_by(|x| (x.0, x.1 < V::zero()));
162 let mut point_groups = point_groups.into_iter();
163
164 let chrom = bdgs[0].chrom();
165 let ((mut prev_pos, _), first_group) = point_groups.next().unwrap();
166 let mut acc_val = first_group.into_iter().map(|x| x.1).sum();
167 let mut prev_bedgraph = BedGraph::new(chrom, prev_pos, prev_pos, acc_val);
168
169 let mut result = point_groups
170 .flat_map(|((pos, _), group)| {
171 let value_sum = group.into_iter().map(|x| x.1).sum();
172 let mut bedgraph = None;
173
174 if prev_pos != pos {
175 if acc_val == prev_bedgraph.value {
176 prev_bedgraph.set_end(pos);
177 } else {
178 bedgraph = Some(prev_bedgraph.clone());
179 prev_bedgraph = BedGraph::new(chrom, prev_pos, pos, acc_val);
180 }
181 }
182
183 acc_val += value_sum;
184 prev_pos = pos;
185 bedgraph
186 })
187 .collect::<Vec<_>>();
188 result.push(prev_bedgraph);
189 result
190 })
191 .flatten()
192 }
193}
194
195impl<T> MergeBed for T where T: Iterator + ?Sized {}
196
197struct MergeBedOutput<I, B, F> {
198 sorted_bed_iter: I,
199 merger: F,
200 accum: Option<((String, u64, u64), Vec<B>)>,
201}
202
203impl<I, F, B, O> Iterator for MergeBedOutput<I, B, F>
204where
205 I: Iterator<Item = B>,
206 B: BEDLike,
207 F: Fn(Vec<B>) -> O,
208{
209 type Item = O;
210 fn next(&mut self) -> Option<Self::Item> {
211 loop {
212 match self.sorted_bed_iter.next() {
213 None => {
214 return if self.accum.is_none() {
215 None
216 } else {
217 let (_, accum) = std::mem::replace(&mut self.accum, None).unwrap();
218 Some((self.merger)(accum))
219 }
220 }
221 Some(record) => match self.accum.as_mut() {
222 None => {
223 self.accum = Some((
224 (record.chrom().to_string(), record.start(), record.end()),
225 vec![record],
226 ))
227 }
228 Some(((chr, s, e), accum)) => {
229 let chr_ = record.chrom();
230 let s_ = record.start();
231 let e_ = record.end();
232 if chr != chr_ || s_ > *e {
233 let (_, acc) = std::mem::replace(
234 &mut self.accum,
235 Some(((chr_.to_string(), s_, e_), vec![record])),
236 )
237 .unwrap();
238 return Some((self.merger)(acc));
239 } else if s_ < *s {
240 panic!("input is not sorted")
241 } else if e_ > *e {
242 *e = e_;
243 accum.push(record);
244 } else {
245 accum.push(record);
246 }
247 }
248 },
249 }
250 }
251 }
252}
253
254#[derive(Debug, Clone)]
255pub struct SortBedOptions {
256 pub num_threads: Option<usize>,
257 pub chunk_size: usize,
258 pub compression: Option<u8>,
259 pub tmp_dir: Option<String>,
260}
261
262impl Default for SortBedOptions {
263 fn default() -> Self {
264 SortBedOptions {
265 num_threads: None,
266 chunk_size: 50000000,
267 compression: Some(1),
268 tmp_dir: None,
269 }
270 }
271}
272
273pub trait SortBed: Iterator {
274 fn sort_bed<B>(self) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
276 where
277 Self: Iterator<Item = B> + Sized,
278 B: BEDLike + Encode + Decode<()> + Send,
279 {
280 self.sort_bed_with_options(SortBedOptions::default())
281 }
282
283 fn sort_bed_with_options<B>(self, opts: SortBedOptions) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
285 where
286 Self: Iterator<Item = B> + Sized,
287 B: BEDLike + Encode + Decode<()> + Send,
288 {
289 let mut sorter = ExternalSorterBuilder::new()
290 .with_chunk_size(opts.chunk_size);
291 if let Some(num_threads) = opts.num_threads {
292 sorter = sorter.num_threads(num_threads);
293 }
294 if let Some(compression) = opts.compression {
295 sorter = sorter.with_compression(compression as u32);
296 }
297 if let Some(tmp_dir) = opts.tmp_dir {
298 sorter = sorter.with_tmp_dir(tmp_dir);
299 }
300 sorter.build().unwrap().sort_by(self, |x: &B, y: &B| x.compare(y))
301 }
302}
303
304impl<T> SortBed for T where T: Iterator + ?Sized {}
305
306#[cfg(test)]
307mod bed_tests {
308 use super::*;
309 use crate::extsort::ExternalSorterBuilder;
310 use itertools::Itertools;
311 use rand::Rng;
312
313 fn rand_bed(n: usize) -> Vec<GenomicRange> {
314 let mut rng = rand::thread_rng();
315 let mut result = Vec::with_capacity(n);
316 for _ in 0..n {
317 let bed = GenomicRange::new(
318 format!("chr{}", rng.gen_range(1..20)),
319 rng.gen_range(0..10000),
320 rng.gen_range(1000000..2000000),
321 );
322 result.push(bed);
323 }
324 result
325 }
326
327 #[test]
328 fn test_sort() {
329 let data1 = vec![
330 GenomicRange::new("chr1", 1020, 1230),
331 GenomicRange::new("chr1", 0, 500),
332 GenomicRange::new("chr2", 500, 1000),
333 GenomicRange::new("chr1", 500, 1000),
334 GenomicRange::new("chr1", 1000, 1230),
335 GenomicRange::new("chr1", 1000, 1230),
336 ];
337 let data2 = rand_bed(100000);
338
339 [data1, data2].into_iter().for_each(|mut data| {
340 let sorted1 = data.clone().into_iter()
341 .sort_bed()
342 .unwrap()
343 .collect::<Result<Vec<_>, _>>()
344 .unwrap();
345 let sorted2 = ExternalSorterBuilder::new()
346 .with_chunk_size(1)
347 .with_compression(4)
348 .build()
349 .unwrap()
350 .sort(data.clone().into_iter())
351 .unwrap()
352 .collect::<Result<Vec<_>, _>>()
353 .unwrap();
354 data.sort();
355 assert_eq!(data, sorted1);
356 assert_eq!(data, sorted2);
357 })
358 }
359
360 #[test]
361 fn test_split() {
362 assert_eq!(
363 GenomicRange::new("chr1", 0, 1230)
364 .split_by_len(500)
365 .collect::<Vec<_>>(),
366 vec![
367 GenomicRange::new("chr1", 0, 500),
368 GenomicRange::new("chr1", 500, 1000),
369 GenomicRange::new("chr1", 1000, 1230),
370 ],
371 );
372 assert_eq!(
373 GenomicRange::new("chr1", 0, 1230)
374 .rsplit_by_len(500)
375 .collect::<Vec<_>>(),
376 vec![
377 GenomicRange::new("chr1", 730, 1230),
378 GenomicRange::new("chr1", 230, 730),
379 GenomicRange::new("chr1", 0, 230),
380 ],
381 );
382 assert_eq!(
383 GenomicRange::new("chr1", 500, 2500)
384 .split_by_len(500)
385 .collect::<Vec<_>>(),
386 GenomicRange::new("chr1", 500, 2500)
387 .rsplit_by_len(500)
388 .sorted()
389 .collect::<Vec<_>>(),
390 );
391 assert_eq!(
392 GenomicRange::new("chr1", 500, 2500)
393 .split_by_len(1)
394 .collect::<Vec<_>>(),
395 GenomicRange::new("chr1", 500, 2500)
396 .rsplit_by_len(1)
397 .sorted()
398 .collect::<Vec<_>>(),
399 );
400 }
401
402 #[test]
403 fn test_overlap() {
404 assert_eq!(
405 GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 150, 300)),
406 Some(GenomicRange::new("chr1", 150, 200)),
407 );
408
409 assert_eq!(
410 GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
411 Some(GenomicRange::new("chr1", 110, 190)),
412 );
413
414 assert_eq!(
415 GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 90, 99)),
416 None,
417 );
418
419 assert_eq!(
420 GenomicRange::new("chr1", 111, 180).overlap(&GenomicRange::new("chr1", 110, 190)),
421 Some(GenomicRange::new("chr1", 111, 180)),
422 );
423
424 assert_eq!(
425 GenomicRange::new("chr1", 111, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
426 Some(GenomicRange::new("chr1", 111, 190)),
427 );
428 }
429
430 #[test]
431 fn test_merge() {
432 let input = [
433 (0, 100),
434 (10, 20),
435 (50, 150),
436 (120, 160),
437 (155, 200),
438 (155, 220),
439 (500, 1000),
440 (2000, 2100),
441 (2100, 2200),
442 ]
443 .into_iter()
444 .map(|(a, b)| std::io::Result::Ok(GenomicRange::new("chr1", a, b)));
445 let expect: Vec<GenomicRange> = [(0, 220), (500, 1000), (2000, 2200)]
446 .into_iter()
447 .map(|(a, b)| GenomicRange::new("chr1", a, b))
448 .collect();
449 assert_eq!(
450 input
451 .clone()
452 .map(|x| x.unwrap())
453 .merge_sorted_bed()
454 .collect::<Vec<_>>(),
455 expect
456 );
457 }
458
459 #[test]
460 fn test_merge_bedgraph() {
461 let reads = vec![
462 BedGraph::new("chr1", 0, 100, 1),
463 BedGraph::new("chr1", 0, 100, 1),
464 BedGraph::new("chr1", 2, 100, 2),
465 BedGraph::new("chr1", 30, 60, 3),
466 BedGraph::new("chr1", 40, 50, 4),
467 BedGraph::new("chr1", 60, 65, 5),
468 ];
469
470 let actual: Vec<_> = reads.into_iter().merge_sorted_bedgraph().collect();
471 let expected = vec![
472 BedGraph::new("chr1", 0, 2, 2),
473 BedGraph::new("chr1", 2, 30, 4),
474 BedGraph::new("chr1", 30, 40, 7),
475 BedGraph::new("chr1", 40, 50, 11),
476 BedGraph::new("chr1", 50, 60, 7),
477 BedGraph::new("chr1", 60, 65, 9),
478 BedGraph::new("chr1", 65, 100, 4),
479 ];
480 assert_eq!(expected, actual);
481 }
482}