1use itertools::Itertools;
2use num::Num;
3use num_traits::NumAssignOps;
4use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
5
6use super::BedGraph;
7use crate::{
8 bed::{GenomicRange, Score, Strand},
9 extsort::{
10 DiskDeserializer, DiskSerializer, ExternalChunkError, ExternalSorterBuilder, SortError,
11 },
12};
13
14use std::{cmp::Ordering, iter::Sum, ops::Neg};
15
16pub trait BEDLike {
18 fn chrom(&self) -> &str;
20
21 fn set_chrom(&mut self, chrom: &str) -> &mut Self;
23
24 fn start(&self) -> u64;
26
27 fn set_start(&mut self, start: u64) -> &mut Self;
29
30 fn end(&self) -> u64;
32
33 fn set_end(&mut self, end: u64) -> &mut Self;
35
36 fn name(&self) -> Option<&str> {
38 None
39 }
40
41 fn score(&self) -> Option<Score> {
43 None
44 }
45
46 fn strand(&self) -> Option<Strand> {
48 None
49 }
50
51 fn len(&self) -> u64 {
54 self.end().saturating_sub(self.start())
55 }
56
57 fn compare(&self, other: &Self) -> Ordering {
58 self.chrom()
59 .cmp(other.chrom())
60 .then(self.start().cmp(&other.start()))
61 .then(self.end().cmp(&other.end()))
62 }
63
64 fn overlap<B: BEDLike>(&self, other: &B) -> Option<GenomicRange> {
66 if self.chrom() != other.chrom() {
67 None
68 } else {
69 let start = self.start().max(other.start());
70 let end = self.end().min(other.end());
71 if start >= end {
72 None
73 } else {
74 Some(GenomicRange::new(self.chrom(), start, end))
75 }
76 }
77 }
78
79 fn n_overlap<B: BEDLike>(&self, other: &B) -> u64 {
81 self.overlap(other).map_or(0, |x| x.len())
82 }
83
84 fn to_genomic_range(&self) -> GenomicRange {
86 GenomicRange::new(self.chrom(), self.start(), self.end())
87 }
88
89 fn split_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
92 let start = self.start();
93 let end = self.end();
94 (start..end)
95 .step_by(bin_size as usize)
96 .map(move |x| GenomicRange::new(self.chrom(), x, (x + bin_size).min(end)))
97 }
98
99 fn rsplit_by_len(&self, bin_size: u64) -> impl Iterator<Item = GenomicRange> {
103 let start = self.start();
104 let end = self.end();
105 (start + 1..=end)
106 .rev()
107 .step_by(bin_size as usize)
108 .map(move |x| GenomicRange::new(self.chrom(), x.saturating_sub(bin_size).max(start), x))
109 }
110}
111
112pub trait MergeBed: Iterator {
113 fn merge_sorted_bed_with<B, O, F>(self, merger: F) -> impl Iterator<Item = O>
122 where
123 Self: Iterator<Item = B> + Sized,
124 B: BEDLike,
125 F: Fn(Vec<B>) -> O,
126 {
127 MergeBedOutput {
128 sorted_bed_iter: self,
129 merger,
130 accum: None,
131 }
132 }
133
134 fn merge_sorted_bed<B>(self) -> impl Iterator<Item = GenomicRange>
137 where
138 Self: Iterator<Item = B> + Sized,
139 B: BEDLike,
140 {
141 self.merge_sorted_bed_with(|x| {
142 GenomicRange::new(
143 x[0].chrom(),
144 x.iter().map(|x| x.start()).min().unwrap(),
145 x.iter().map(|x| x.end()).max().unwrap(),
146 )
147 })
148 }
149
150 fn merge_sorted_bedgraph<V>(self) -> impl Iterator<Item = BedGraph<V>>
151 where
152 Self: Iterator<Item = BedGraph<V>> + Sized,
153 V: Num + NumAssignOps + Sum + Neg<Output = V> + PartialOrd + Copy,
154 {
155 self.merge_sorted_bed_with(|bdgs| {
156 let point_groups = bdgs
158 .iter()
159 .flat_map(|bedgraph| {
160 [
161 (bedgraph.start(), bedgraph.value),
162 (bedgraph.end(), bedgraph.value.neg()),
163 ]
164 })
165 .sorted_unstable_by_key(|x| x.0)
166 .chunk_by(|x| (x.0, x.1 < V::zero()));
167 let mut point_groups = point_groups.into_iter();
168
169 let chrom = bdgs[0].chrom();
170 let ((mut prev_pos, _), first_group) = point_groups.next().unwrap();
171 let mut acc_val = first_group.into_iter().map(|x| x.1).sum();
172 let mut prev_bedgraph = BedGraph::new(chrom, prev_pos, prev_pos, acc_val);
173
174 let mut result = point_groups
175 .flat_map(|((pos, _), group)| {
176 let value_sum = group.into_iter().map(|x| x.1).sum();
177 let mut bedgraph = None;
178
179 if prev_pos != pos {
180 if acc_val == prev_bedgraph.value {
181 prev_bedgraph.set_end(pos);
182 } else {
183 bedgraph = Some(prev_bedgraph.clone());
184 prev_bedgraph = BedGraph::new(chrom, prev_pos, pos, acc_val);
185 }
186 }
187
188 acc_val += value_sum;
189 prev_pos = pos;
190 bedgraph
191 })
192 .collect::<Vec<_>>();
193 result.push(prev_bedgraph);
194 result
195 })
196 .flatten()
197 }
198}
199
200impl<T> MergeBed for T where T: Iterator + ?Sized {}
201
202struct MergeBedOutput<I, B, F> {
203 sorted_bed_iter: I,
204 merger: F,
205 accum: Option<((String, u64, u64), Vec<B>)>,
206}
207
208impl<I, F, B, O> Iterator for MergeBedOutput<I, B, F>
209where
210 I: Iterator<Item = B>,
211 B: BEDLike,
212 F: Fn(Vec<B>) -> O,
213{
214 type Item = O;
215 fn next(&mut self) -> Option<Self::Item> {
216 loop {
217 match self.sorted_bed_iter.next() {
218 None => {
219 return if self.accum.is_none() {
220 None
221 } else {
222 let (_, accum) = std::mem::replace(&mut self.accum, None).unwrap();
223 Some((self.merger)(accum))
224 }
225 }
226 Some(record) => match self.accum.as_mut() {
227 None => {
228 self.accum = Some((
229 (record.chrom().to_string(), record.start(), record.end()),
230 vec![record],
231 ))
232 }
233 Some(((chr, s, e), accum)) => {
234 let chr_ = record.chrom();
235 let s_ = record.start();
236 let e_ = record.end();
237 if chr != chr_ || s_ > *e {
238 let (_, acc) = std::mem::replace(
239 &mut self.accum,
240 Some(((chr_.to_string(), s_, e_), vec![record])),
241 )
242 .unwrap();
243 return Some((self.merger)(acc));
244 } else if s_ < *s {
245 panic!("input is not sorted")
246 } else if e_ > *e {
247 *e = e_;
248 accum.push(record);
249 } else {
250 accum.push(record);
251 }
252 }
253 },
254 }
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
260pub struct SortBedOptions {
261 pub num_threads: Option<usize>,
262 pub chunk_size: usize,
263 pub compression: Option<u8>,
264 pub tmp_dir: Option<String>,
265}
266
267impl Default for SortBedOptions {
268 fn default() -> Self {
269 SortBedOptions {
270 num_threads: None,
271 chunk_size: 50000000,
272 compression: Some(1),
273 tmp_dir: None,
274 }
275 }
276}
277
278pub trait SortBed: Iterator {
279 fn sort_bed<B>(
281 self,
282 ) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
283 where
284 Self: Iterator<Item = B> + Sized,
285 B: BEDLike + Archive + for<'a> RkyvSerialize<DiskSerializer<'a>> + Send,
286 B::Archived: RkyvDeserialize<B, DiskDeserializer>,
287 {
288 self.sort_bed_with_options(SortBedOptions::default())
289 }
290
291 fn sort_bed_with_options<B>(
293 self,
294 opts: SortBedOptions,
295 ) -> Result<impl ExactSizeIterator + Iterator<Item = Result<B, ExternalChunkError>>, SortError>
296 where
297 Self: Iterator<Item = B> + Sized,
298 B: BEDLike + Archive + for<'a> RkyvSerialize<DiskSerializer<'a>> + Send,
299 B::Archived: RkyvDeserialize<B, DiskDeserializer>,
300 {
301 let mut sorter = ExternalSorterBuilder::new().with_chunk_size(opts.chunk_size);
302 if let Some(num_threads) = opts.num_threads {
303 sorter = sorter.num_threads(num_threads);
304 }
305 if let Some(compression) = opts.compression {
306 sorter = sorter.with_compression(compression as u32);
307 }
308 if let Some(tmp_dir) = opts.tmp_dir {
309 sorter = sorter.with_tmp_dir(tmp_dir);
310 }
311 sorter
312 .build()
313 .unwrap()
314 .sort_by(self, |x: &B, y: &B| x.compare(y))
315 }
316}
317
318impl<T> SortBed for T where T: Iterator + ?Sized {}
319
320#[cfg(test)]
321mod bed_tests {
322 use super::*;
323 use crate::extsort::ExternalSorterBuilder;
324 use itertools::Itertools;
325 use rand::Rng;
326
327 fn rand_bed(n: usize) -> Vec<GenomicRange> {
328 let mut rng = rand::thread_rng();
329 let mut result = Vec::with_capacity(n);
330 for _ in 0..n {
331 let bed = GenomicRange::new(
332 format!("chr{}", rng.gen_range(1..20)),
333 rng.gen_range(0..10000),
334 rng.gen_range(1000000..2000000),
335 );
336 result.push(bed);
337 }
338 result
339 }
340
341 #[test]
342 fn test_sort() {
343 let data1 = vec![
344 GenomicRange::new("chr1", 1020, 1230),
345 GenomicRange::new("chr1", 0, 500),
346 GenomicRange::new("chr2", 500, 1000),
347 GenomicRange::new("chr1", 500, 1000),
348 GenomicRange::new("chr1", 1000, 1230),
349 GenomicRange::new("chr1", 1000, 1230),
350 ];
351 let data2 = rand_bed(100000);
352
353 [data1, data2].into_iter().for_each(|mut data| {
354 let sorted1 = data
355 .clone()
356 .into_iter()
357 .sort_bed()
358 .unwrap()
359 .collect::<Result<Vec<_>, _>>()
360 .unwrap();
361 let sorted2 = ExternalSorterBuilder::new()
362 .with_chunk_size(1)
363 .with_compression(4)
364 .build()
365 .unwrap()
366 .sort(data.clone().into_iter())
367 .unwrap()
368 .collect::<Result<Vec<_>, _>>()
369 .unwrap();
370 data.sort();
371 assert_eq!(data, sorted1);
372 assert_eq!(data, sorted2);
373 })
374 }
375
376 #[test]
377 fn test_split() {
378 assert_eq!(
379 GenomicRange::new("chr1", 0, 1230)
380 .split_by_len(500)
381 .collect::<Vec<_>>(),
382 vec![
383 GenomicRange::new("chr1", 0, 500),
384 GenomicRange::new("chr1", 500, 1000),
385 GenomicRange::new("chr1", 1000, 1230),
386 ],
387 );
388 assert_eq!(
389 GenomicRange::new("chr1", 0, 1230)
390 .rsplit_by_len(500)
391 .collect::<Vec<_>>(),
392 vec![
393 GenomicRange::new("chr1", 730, 1230),
394 GenomicRange::new("chr1", 230, 730),
395 GenomicRange::new("chr1", 0, 230),
396 ],
397 );
398 assert_eq!(
399 GenomicRange::new("chr1", 500, 2500)
400 .split_by_len(500)
401 .collect::<Vec<_>>(),
402 GenomicRange::new("chr1", 500, 2500)
403 .rsplit_by_len(500)
404 .sorted()
405 .collect::<Vec<_>>(),
406 );
407 assert_eq!(
408 GenomicRange::new("chr1", 500, 2500)
409 .split_by_len(1)
410 .collect::<Vec<_>>(),
411 GenomicRange::new("chr1", 500, 2500)
412 .rsplit_by_len(1)
413 .sorted()
414 .collect::<Vec<_>>(),
415 );
416 }
417
418 #[test]
419 fn test_overlap() {
420 assert_eq!(
421 GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 150, 300)),
422 Some(GenomicRange::new("chr1", 150, 200)),
423 );
424
425 assert_eq!(
426 GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
427 Some(GenomicRange::new("chr1", 110, 190)),
428 );
429
430 assert_eq!(
431 GenomicRange::new("chr1", 100, 200).overlap(&GenomicRange::new("chr1", 90, 99)),
432 None,
433 );
434
435 assert_eq!(
436 GenomicRange::new("chr1", 111, 180).overlap(&GenomicRange::new("chr1", 110, 190)),
437 Some(GenomicRange::new("chr1", 111, 180)),
438 );
439
440 assert_eq!(
441 GenomicRange::new("chr1", 111, 200).overlap(&GenomicRange::new("chr1", 110, 190)),
442 Some(GenomicRange::new("chr1", 111, 190)),
443 );
444 }
445
446 #[test]
447 fn test_merge() {
448 let input = [
449 (0, 100),
450 (10, 20),
451 (50, 150),
452 (120, 160),
453 (155, 200),
454 (155, 220),
455 (500, 1000),
456 (2000, 2100),
457 (2100, 2200),
458 ]
459 .into_iter()
460 .map(|(a, b)| std::io::Result::Ok(GenomicRange::new("chr1", a, b)));
461 let expect: Vec<GenomicRange> = [(0, 220), (500, 1000), (2000, 2200)]
462 .into_iter()
463 .map(|(a, b)| GenomicRange::new("chr1", a, b))
464 .collect();
465 assert_eq!(
466 input
467 .clone()
468 .map(|x| x.unwrap())
469 .merge_sorted_bed()
470 .collect::<Vec<_>>(),
471 expect
472 );
473 }
474
475 #[test]
476 fn test_merge_bedgraph() {
477 let reads = vec![
478 BedGraph::new("chr1", 0, 100, 1),
479 BedGraph::new("chr1", 0, 100, 1),
480 BedGraph::new("chr1", 2, 100, 2),
481 BedGraph::new("chr1", 30, 60, 3),
482 BedGraph::new("chr1", 40, 50, 4),
483 BedGraph::new("chr1", 60, 65, 5),
484 ];
485
486 let actual: Vec<_> = reads.into_iter().merge_sorted_bedgraph().collect();
487 let expected = vec![
488 BedGraph::new("chr1", 0, 2, 2),
489 BedGraph::new("chr1", 2, 30, 4),
490 BedGraph::new("chr1", 30, 40, 7),
491 BedGraph::new("chr1", 40, 50, 11),
492 BedGraph::new("chr1", 50, 60, 7),
493 BedGraph::new("chr1", 60, 65, 9),
494 BedGraph::new("chr1", 65, 100, 4),
495 ];
496 assert_eq!(expected, actual);
497 }
498}