1use anyhow::Result;
2use std::fs::File;
3use std::io::BufRead;
4use std::path::{Path, PathBuf};
5
6use md5::{Digest, Md5};
7
8use flate2::write::GzEncoder;
9use flate2::Compression;
10use std::collections::{HashMap, HashSet};
11use std::fmt::Debug;
12use std::fmt::{self, Display};
13use std::io::{BufWriter, Error, Write};
14#[cfg(feature = "bigbed")]
15use tokio::runtime;
16
17#[cfg(feature = "bigbed")]
18use bigtools::beddata::BedParserStreamingIterator;
19#[cfg(feature = "bigbed")]
20use bigtools::{BedEntry, BigBedWrite};
21
22use crate::models::Region;
23#[cfg(feature = "http")]
24use crate::utils::get_dynamic_reader_from_url;
25use crate::utils::{get_chrom_sizes, get_dynamic_reader};
26
27#[cfg(feature = "dataframe")]
28use polars::prelude::*;
29#[cfg(feature = "dataframe")]
30use std::io::Cursor;
31
32#[derive(Clone, Debug)]
37pub struct RegionSet {
38 pub regions: Vec<Region>,
39 pub header: Option<String>,
40 pub path: Option<PathBuf>,
41}
42
43pub struct RegionSetIterator<'a> {
44 region_set: &'a RegionSet,
45 index: usize,
46}
47
48impl TryFrom<&Path> for RegionSet {
49 type Error = anyhow::Error;
50
51 fn try_from(value: &Path) -> Result<Self> {
57 let path = value;
58
59 let mut new_regions: Vec<Region> = Vec::new();
60
61 let reader = match path.is_file() {
62 true => get_dynamic_reader(path).expect("!Can't read file"),
63 #[cfg(feature = "http")]
64 false => {
65 match get_dynamic_reader_from_url(path) {
66 Ok(reader) => reader,
67 Err(_) => {
68 let bbid = path.to_str().ok_or_else(|| {
70 anyhow::anyhow!("BEDbase identifier is not valid UTF-8: {:?}", path)
71 })?;
72
73 let fallback_url = format!(
74 "https://api.bedbase.org/v1/files/files/{}/{}/{}.bed.gz",
75 &bbid[0..1],
76 &bbid[1..2],
77 bbid
78 );
79
80 let fallback_path = PathBuf::from(fallback_url);
81
82 get_dynamic_reader_from_url(&fallback_path)
83 .expect("!Can't get file from path, url, or BEDbase identifier")
84 }
85 }
86 }
87 #[cfg(not(feature = "http"))]
88 false => {
89 return Err(anyhow::anyhow!(
90 "File not found and HTTP feature not enabled: {}",
91 path.display()
92 ));
93 }
94 };
95
96 let mut header: String = String::new();
97
98 let mut first_line: bool = true;
99
100 for line in reader.lines() {
101 let string_line = line?;
102
103 let parts: Vec<String> = string_line.split('\t').map(|s| s.to_string()).collect();
104
105 if string_line.starts_with("browser")
106 | string_line.starts_with("track")
107 | string_line.starts_with("#")
108 {
109 header.push_str(&string_line);
110 first_line = false;
111 continue;
112 }
113
114 if first_line {
116 if parts.len() >= 3 {
117 let is_header: bool = match parts[1].parse::<u32>() {
118 Ok(_num) => false,
119 Err(_) => true,
120 };
121 if is_header {
122 header.push_str(&string_line);
123 first_line = false;
124 continue;
125 }
126 }
127 first_line = false;
128 }
129
130 new_regions.push(Region {
131 chr: parts[0].to_owned(),
132
133 start: match parts[1].parse() {
135 Ok(start) => start,
136 Err(_err) => {
137 return Err(Error::other(format!(
138 "Error in parsing start position: {:?}",
139 parts
140 ))
141 .into())
142 }
143 },
144 end: match parts[2].parse() {
145 Ok(end) => end,
146 Err(_err) => {
147 return Err(anyhow::Error::from(Error::other(format!(
148 "Error in parsing end position: {:?}",
149 parts
150 ))))
151 }
152 },
153 rest: Some(parts[3..].join("\t")).filter(|s| !s.is_empty()),
154 });
155 }
156 if new_regions.is_empty() {
157 let new_error = Error::other(format!(
158 "Corrupted file. 0 regions found in the file: {}",
159 path.display()
160 ));
161 return Err(new_error.into());
162 }
163
164 let mut rs = RegionSet {
165 regions: new_regions,
166 header: match header.is_empty() {
167 true => None,
168 false => Some(header),
169 },
170 path: Some(value.to_owned()),
171 };
172 rs.sort();
174
175 Ok(rs)
176 }
177}
178
179impl TryFrom<&str> for RegionSet {
180 type Error = anyhow::Error;
181
182 fn try_from(value: &str) -> Result<Self> {
183 RegionSet::try_from(Path::new(value))
184 }
185}
186
187impl TryFrom<String> for RegionSet {
188 type Error = anyhow::Error;
189
190 fn try_from(value: String) -> Result<Self> {
191 RegionSet::try_from(Path::new(&value))
193 }
194}
195
196impl TryFrom<PathBuf> for RegionSet {
197 type Error = anyhow::Error;
198
199 fn try_from(value: PathBuf) -> Result<Self> {
200 RegionSet::try_from(value.as_path())
201 }
202}
203
204impl From<Vec<Region>> for RegionSet {
205 fn from(regions: Vec<Region>) -> Self {
206 let path = None;
207
208 RegionSet {
209 regions,
210 header: None,
211 path,
212 }
213 }
214}
215
216impl From<&[u8]> for RegionSet {
217 fn from(value: &[u8]) -> Self {
218 let region_str = String::from_utf8_lossy(value);
219 let regions: Vec<Region> = region_str
220 .split('\n')
221 .map(|line| {
222 let parts = line.split('\t').collect::<Vec<&str>>();
223
224 let chr = parts[0].to_string();
225 let start = parts[1].parse::<u32>().unwrap();
226 let end = parts[2].parse::<u32>().unwrap();
227 let rest = Some(parts[3..].join("\t")).filter(|s| !s.is_empty());
228
229 Region {
230 chr,
231 start,
232 end,
233 rest,
234 }
235 })
236 .collect();
237
238 RegionSet {
239 regions,
240 header: None,
241 path: None,
242 }
243 }
244}
245
246impl<'a> Iterator for RegionSetIterator<'a> {
247 type Item = &'a Region;
248
249 fn next(&mut self) -> Option<Self::Item> {
250 if self.index < self.region_set.regions.len() {
251 let region = &self.region_set.regions[self.index];
252 self.index += 1;
253 Some(region)
254 } else {
255 None
256 }
257 }
258}
259
260impl<'a> IntoIterator for &'a RegionSet {
261 type Item = &'a Region;
262 type IntoIter = RegionSetIterator<'a>;
263
264 fn into_iter(self) -> Self::IntoIter {
265 RegionSetIterator {
266 region_set: self,
267 index: 0,
268 }
269 }
270}
271
272impl RegionSet {
273 pub fn to_bed<T: AsRef<Path>>(&self, path: T) -> std::io::Result<()> {
279 let path = path.as_ref();
280 if path.exists() {
281 println!("Bed file already exists. Overwriting existing file")
282 }
283
284 if let Some(parent) = path.parent() {
285 std::fs::create_dir_all(parent)?;
286 }
287
288 let mut file = File::create(path).unwrap();
289
290 for region in &self.regions {
291 writeln!(file, "{}", region.as_string())?;
292 }
293 Ok(())
294 }
295
296 pub fn to_bed_gz<T: AsRef<Path>>(&self, path: T) -> std::io::Result<()> {
302 let path = path.as_ref();
303 if path.exists() {
304 println!("Bed file already exists. Overwriting existing file")
305 }
306
307 if let Some(parent) = path.parent() {
308 std::fs::create_dir_all(parent)?;
309 }
310
311 let file = File::create(path)?;
312 let mut buffer: String = String::new();
313
314 for region in &self.regions {
315 buffer.push_str(&format!("{}\n", region.as_string(),));
316 }
317
318 let mut encoder = GzEncoder::new(BufWriter::new(file), Compression::best());
319 encoder.write_all(buffer.as_bytes())?;
320
321 Ok(())
322 }
323
324 pub fn identifier(&self) -> String {
333 let mut chrs = String::new();
334 let mut starts = String::new();
335 let mut ends = String::new();
336
337 let mut first = true;
338 for region in &self.regions {
339 if !first {
340 chrs.push(',');
341 starts.push(',');
342 ends.push(',');
343 }
344 chrs.push_str(®ion.chr);
345 starts.push_str(®ion.start.to_string());
346 ends.push_str(®ion.end.to_string());
347
348 first = false;
349 }
350
351 let mut hasher = Md5::new();
354 hasher.update(chrs);
355 let chrom_hash = hasher.finalize();
356
357 let mut hasher = Md5::new();
358 hasher.update(starts);
359 let start_hash = hasher.finalize();
360
361 let mut hasher = Md5::new();
362 hasher.update(ends);
363 let end_hash = hasher.finalize();
364
365 let combined = format!("{:x},{:x},{:x}", chrom_hash, start_hash, end_hash);
366
367 let mut hasher = Md5::new();
368 hasher.update(combined);
369 let hash = hasher.finalize();
370 let bed_digest: String = format!("{:x}", hash);
371
372 bed_digest
373 }
374
375 pub fn file_digest(&self) -> String {
376 let mut buffer: String = String::new();
377 for region in &self.regions {
378 buffer.push_str(&format!("{}\n", region.as_string(),));
379 }
380
381 let mut hasher = Md5::new();
382
383 hasher.update(buffer);
384 let hash = hasher.finalize();
385 let file_digest: String = format!("{:x}", hash);
386
387 file_digest
388 }
389
390 pub fn iter_chroms(&self) -> impl Iterator<Item = &String> {
394 let unique_chroms: HashSet<&String> = self.regions.iter().map(|r| &r.chr).collect();
395 unique_chroms.into_iter()
396 }
397
398 pub fn iter_chr_regions<'a>(&'a self, chr: &'a str) -> impl Iterator<Item = &'a Region> {
405 self.regions.iter().filter(move |r| r.chr == chr)
406 }
407
408 #[cfg(feature = "bigbed")]
416 pub fn to_bigbed<T: AsRef<Path>>(&self, out_path: T, chrom_size: T) -> Result<()> {
417 let out_path = out_path.as_ref();
418
419 if out_path.exists() {
420 println!("Bed file already exists. Overwriting existing file")
421 }
422
423 if let Some(parent) = out_path.parent() {
424 std::fs::create_dir_all(parent)?;
425 }
426 let chrom_sizes: HashMap<String, u32> = get_chrom_sizes(chrom_size);
427
428 let mut warnings_count: i32 = 0;
429 let region_vector = self.regions.iter().map(|i| {
430 if !chrom_sizes.contains_key(&i.chr) {
432 eprintln!(
433 "Warning:: Chromosome is not found in Chrom sizes file. Chr: '{}'",
434 i.chr
435 );
436 if warnings_count > 40 {
437 panic!("Incorrect chrom sizes provided. Unable to create bigBed file!");
438 }
439
440 warnings_count += 1;
441 return None;
442 }
443 Some(Ok::<_, Error>((
444 i.chr.clone(),
445 BedEntry {
446 start: i.start,
447 end: i.end,
448 rest: i
449 .rest
450 .as_deref()
451 .map_or(String::new(), |s| format!("\t{}", s)),
452 },
453 )))
454 });
455
456 #[allow(clippy::option_filter_map)]
457 let region_vector = region_vector.filter(|e| e.is_some()).map(|e| e.unwrap());
459
460 let runtime = runtime::Builder::new_multi_thread()
461 .worker_threads(
462 std::thread::available_parallelism()
463 .map(|c| c.into())
464 .unwrap_or(1),
465 )
466 .build()
467 .expect("Unable to create thread pool.");
468
469 let mut bb_out = BigBedWrite::create_file(out_path, chrom_sizes.clone())
470 .expect("Failed to create bigBed file.");
471
472 bb_out.options.max_zooms = 8;
473
474 let data = BedParserStreamingIterator::wrap_iter(region_vector.into_iter(), true);
475 bb_out.write(data, runtime)?;
476 Ok(())
477 }
478
479 pub fn sort(&mut self) {
485 self.regions
486 .sort_by(|a, b| a.chr.cmp(&b.chr).then_with(|| a.start.cmp(&b.start)));
487 }
488
489 pub fn is_empty(&self) -> bool {
493 if self.regions.len() == 0 {
494 return true;
495 }
496 false
497 }
498
499 pub fn region_widths(&self) -> Result<Vec<u32>> {
503 let mut widths: Vec<u32> = Vec::new();
504
505 for region in &self.regions {
506 widths.push(region.width())
507 }
508
509 Ok(widths)
510 }
511
512 pub fn mean_region_width(&self) -> f64 {
516 let sum: u32 = self
517 .regions
518 .iter()
519 .map(|region| region.end - region.start)
520 .sum();
521 let count: u32 = self.regions.len() as u32;
522
523 ((sum as f64 / count as f64) * 100.0).round() / 100.0
525 }
526
527 pub fn calc_mid_points(&self) -> HashMap<String, Vec<u32>> {
531 let mut mid_points: HashMap<String, Vec<u32>> = HashMap::new();
532 for chromosome in self.iter_chroms() {
533 let mut chr_mid_points: Vec<u32> = Vec::new();
534 for region in self.iter_chr_regions(chromosome) {
535 chr_mid_points.push(region.mid_point());
536 }
537 mid_points.insert(chromosome.clone(), chr_mid_points);
538 }
539 mid_points
540 }
541
542 pub fn len(&self) -> usize {
548 self.regions.len()
549 }
550
551 pub fn get_max_end_per_chr(&self) -> HashMap<String, u32> {
555 let mut result: HashMap<String, u32> = HashMap::new();
556
557 let mut current_chr: &String = &self.regions[0].chr;
558 let mut max_end: u32 = self.regions[0].end;
559
560 for r in &self.regions[1..] {
561 if &r.chr == current_chr {
562 max_end = max_end.max(r.end);
564 } else {
565 result.insert(current_chr.clone(), max_end);
567 current_chr = &r.chr;
568 max_end = r.end;
569 }
570 }
571
572 result.insert(current_chr.clone(), max_end);
574
575 result
576 }
577
578 pub fn nucleotides_length(&self) -> u32 {
582 let mut total_count: u32 = 0;
583 for r in &self.regions {
584 total_count += r.width();
585 }
586 total_count
587 }
588
589 #[cfg(feature = "dataframe")]
593 pub fn to_polars(&self) -> PolarsResult<DataFrame> {
594 let data: String = self
596 .regions
597 .iter()
598 .map(|region| {
599 if let Some(rest) = region.rest.as_deref() {
600 format!("{}\t{}\t{}\t{}", region.chr, region.start, region.end, rest,)
601 } else {
602 format!("{}\t{}\t{}", region.chr, region.start, region.end,)
603 }
604 })
605 .collect::<Vec<_>>()
606 .join("\n");
607
608 let cursor = Cursor::new(data);
609
610 let df = CsvReadOptions::default()
611 .with_has_header(false)
612 .map_parse_options(|parse_options| parse_options.with_separator(b'\t'))
613 .with_infer_schema_length(Some(10000))
614 .into_reader_with_file_handle(cursor)
615 .finish()?;
616
617 Ok(df)
618 }
619}
620
621impl Display for RegionSet {
622 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
623 write!(f, "RegionSet with {} regions.", self.len())
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 use pretty_assertions::assert_eq;
632 use rstest::*;
633
634 fn get_test_path(file_name: &str) -> Result<PathBuf, Error> {
635 let file_path: PathBuf = std::env::current_dir()
636 .unwrap()
637 .join("../tests/data/regionset")
638 .join(file_name);
639 Ok(file_path)
640 }
641
642 #[rstest]
643 fn test_open_from_path() {
644 let file_path = get_test_path("dummy.narrowPeak").unwrap();
645 assert!(RegionSet::try_from(file_path.as_path()).is_ok());
646 }
647
648 #[rstest]
649 fn test_open_from_string() {
650 let file_path = get_test_path("dummy.narrowPeak").unwrap();
651 assert!(RegionSet::try_from(file_path.to_str().unwrap()).is_ok());
652 }
653
654 #[rstest]
655 fn test_open_from_url() {
656 let file_path = String::from(
657 "https://www.encodeproject.org/files/ENCFF321QPN/@@download/ENCFF321QPN.bed.gz",
658 );
659 assert!(RegionSet::try_from(file_path).is_ok());
660 }
661
662 #[rstest]
663 #[ignore = "Avoid BEDbase dependency in CI"]
664 fn test_open_from_bedbase() {
665 let bbid = String::from("6b2e163a1d4319d99bd465c6c78a9741");
666 let region_set = RegionSet::try_from(bbid);
667 assert_eq!(region_set.is_ok(), true);
668 assert_eq!(
669 region_set.unwrap().identifier(),
670 "6b2e163a1d4319d99bd465c6c78a9741"
671 );
672 }
673
674 #[rstest]
675 fn test_open_bed_gz() {
676 let file_path = get_test_path("dummy.narrowPeak.bed.gz").unwrap();
677 assert!(RegionSet::try_from(file_path.to_str().unwrap()).is_ok());
678 }
679
680 #[rstest]
681 fn test_calculate_identifier() {
682 let file_path = get_test_path("dummy.narrowPeak.bed.gz").unwrap();
683 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
684
685 assert_eq!("f0b2cf73383b53bd97ff525a0380f200", region_set.identifier());
686 }
687
688 #[rstest]
689 fn test_save_bed_gz() {
690 let file_path = get_test_path("dummy.narrowPeak.bed.gz").unwrap();
691 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
692
693 let tempdir = tempfile::tempdir().unwrap();
694
695 let mut new_file_path = tempdir.keep();
696 new_file_path.push("new_file.bed.gz");
697
698 assert!(region_set.to_bed_gz(new_file_path.as_path()).is_ok());
699
700 let new_region = RegionSet::try_from(new_file_path.as_path());
701 assert!(new_region.is_ok());
702 assert_eq!(new_region.unwrap().identifier(), region_set.identifier())
703 }
704
705 #[rstest]
706 fn test_save_bed() {
707 let file_path = get_test_path("dummy.narrowPeak").unwrap();
708 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
709
710 let tempdir = tempfile::tempdir().unwrap();
711
712 let mut new_file_path = tempdir.keep();
713 new_file_path.push("new_bedfile.bed");
714
715 assert!(region_set.to_bed(new_file_path.as_path()).is_ok());
716
717 let new_region = RegionSet::try_from(new_file_path.as_path());
718 assert!(new_region.is_ok());
719 assert_eq!(new_region.unwrap().identifier(), region_set.identifier())
720 }
721
722 #[cfg(feature = "bigbed")]
723 #[rstest]
724 fn test_save_bigbed() {
725 let file_path = get_test_path("dummy.narrowPeak").unwrap();
726 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
727
728 let chrom_sizes_path: PathBuf = std::env::current_dir()
729 .unwrap()
730 .join("../tests/data/regionset/dummy_chrom_sizes");
731
732 let tempdir = tempfile::tempdir().unwrap();
733 let mut new_file_path = tempdir.keep();
734 new_file_path.push("new.bigbed");
735
736 assert!(region_set
737 .to_bigbed(new_file_path.as_path(), chrom_sizes_path.as_path())
738 .is_ok());
739 }
740
741 #[rstest]
742 fn test_read_headers() {
743 let file_path = get_test_path("dummy_headers.bed").unwrap();
744 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
745
746 assert!(region_set.header.is_some());
747 assert_eq!(region_set.path.unwrap(), file_path);
748 }
749
750 #[rstest]
751 fn test_is_empty() {
752 let file_path = get_test_path("dummy_headers.bed").unwrap();
753 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
754
755 assert!(!region_set.is_empty());
756 }
757
758 #[rstest]
759 fn test_file_digest() {
760 let file_path = get_test_path("dummy.narrowPeak").unwrap();
761 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
762
763 assert_eq!(region_set.file_digest(), "6224c4d40832b3e0889250f061e01120");
764 assert_eq!(region_set.identifier(), "f0b2cf73383b53bd97ff525a0380f200")
765 }
766
767 #[rstest]
768 fn test_mean_region_width() {
769 let file_path = get_test_path("dummy.narrowPeak").unwrap();
770 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
771
772 assert_eq!(region_set.mean_region_width(), 4.22)
773 }
774 #[rstest]
775 fn test_open_file_with_incorrect_headers() {
776 let file_path = get_test_path("dummy_incorrect_headers.bed").unwrap();
777 let _region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
778 }
779
780 #[rstest]
781 fn test_chr_length() {
782 let file_path = get_test_path("dummy.narrowPeak").unwrap();
783 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
784 assert_eq!(*region_set.get_max_end_per_chr().get("chr1").unwrap(), 36);
785 assert_eq!(region_set.get_max_end_per_chr().len(), 1)
786 }
787
788 #[rstest]
789 fn test_total_nucleotides_function() {
790 let file_path = get_test_path("dummy.narrowPeak").unwrap();
791 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
792
793 assert_eq!(region_set.nucleotides_length(), 38)
794 }
795
796 #[rstest]
797 fn test_iter_chroms() {
798 let file_path = get_test_path("dummy.narrowPeak").unwrap();
799 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
800
801 assert_eq!(region_set.iter_chroms().collect::<Vec<_>>().len(), 1)
802 }
803
804 #[cfg(feature = "dataframe")]
805 #[rstest]
806 fn test_polars() {
807 let file_path = get_test_path("dummy.narrowPeak").unwrap();
808 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
809 let rs_polars = region_set.to_polars().unwrap();
810 println!("Number of columns: {:?}", rs_polars.get_columns().len());
811 assert_eq!(rs_polars.get_columns().len(), 10);
812 }
813
814 #[rstest]
815 fn test_calc_mid_points() {
816 let file_path = get_test_path("dummy.narrowPeak").unwrap();
817 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
818
819 let mid_points = region_set.calc_mid_points();
820 assert_eq!(mid_points.get("chr1").unwrap().len(), 9);
821 assert_eq!(mid_points.len(), 1);
822 assert_eq!(
823 mid_points
824 .get("chr1")
825 .unwrap()
826 .iter()
827 .min()
828 .copied()
829 .unwrap(),
830 6u32
831 );
832 }
833}