1use anyhow::Result;
2use std::fs::File;
3use std::io::BufRead;
4use std::path::{Path, PathBuf};
5
6use md5::{Digest, Md5};
7
8use flate2::write::GzEncoder;
9use flate2::Compression;
10use std::collections::{HashMap, HashSet};
11use std::fmt::Debug;
12use std::fmt::{self, Display};
13use std::io::{BufWriter, Error, Write};
14#[cfg(feature = "bigbed")]
15use tokio::runtime;
16
17#[cfg(feature = "bigbed")]
18use bigtools::beddata::BedParserStreamingIterator;
19#[cfg(feature = "bigbed")]
20use bigtools::{BedEntry, BigBedWrite};
21
22use crate::models::Region;
23#[cfg(feature = "http")]
24use crate::utils::get_dynamic_reader_from_url;
25use crate::utils::get_dynamic_reader;
26#[cfg(feature = "bigbed")]
27use crate::utils::get_chrom_sizes;
28
29#[cfg(feature = "dataframe")]
30use polars::prelude::*;
31#[cfg(feature = "dataframe")]
32use std::io::Cursor;
33
34#[derive(Clone, Debug)]
39pub struct RegionSet {
40 pub regions: Vec<Region>,
41 pub header: Option<String>,
42 pub path: Option<PathBuf>,
43}
44
45pub struct RegionSetIterator<'a> {
46 region_set: &'a RegionSet,
47 index: usize,
48}
49
50impl TryFrom<&Path> for RegionSet {
51 type Error = anyhow::Error;
52
53 fn try_from(value: &Path) -> Result<Self> {
59 let path = value;
60
61 let mut new_regions: Vec<Region> = Vec::new();
62
63 let reader = match path.is_file() {
64 true => get_dynamic_reader(path).expect("!Can't read file"),
65 #[cfg(feature = "http")]
66 false => {
67 match get_dynamic_reader_from_url(path) {
68 Ok(reader) => reader,
69 Err(_) => {
70 let bbid = path.to_str().ok_or_else(|| {
72 anyhow::anyhow!("BEDbase identifier is not valid UTF-8: {:?}", path)
73 })?;
74
75 let fallback_url = format!(
76 "https://api.bedbase.org/v1/files/files/{}/{}/{}.bed.gz",
77 &bbid[0..1],
78 &bbid[1..2],
79 bbid
80 );
81
82 let fallback_path = PathBuf::from(fallback_url);
83
84 get_dynamic_reader_from_url(&fallback_path)
85 .expect("!Can't get file from path, url, or BEDbase identifier")
86 }
87 }
88 }
89 #[cfg(not(feature = "http"))]
90 false => {
91 return Err(anyhow::anyhow!(
92 "File not found and HTTP feature not enabled: {}",
93 path.display()
94 ));
95 }
96 };
97
98 let mut header: String = String::new();
99
100 let mut first_line: bool = true;
101
102 for line in reader.lines() {
103 let string_line = line?;
104
105 let parts: Vec<String> = string_line.split('\t').map(|s| s.to_string()).collect();
106
107 if string_line.starts_with("browser")
108 | string_line.starts_with("track")
109 | string_line.starts_with("#")
110 {
111 header.push_str(&string_line);
112 first_line = false;
113 continue;
114 }
115
116 if first_line {
118 if parts.len() >= 3 {
119 let is_header: bool = match parts[1].parse::<u32>() {
120 Ok(_num) => false,
121 Err(_) => true,
122 };
123 if is_header {
124 header.push_str(&string_line);
125 first_line = false;
126 continue;
127 }
128 }
129 first_line = false;
130 }
131
132 new_regions.push(Region {
133 chr: parts[0].to_owned(),
134
135 start: match parts[1].parse() {
137 Ok(start) => start,
138 Err(_err) => {
139 return Err(Error::other(format!(
140 "Error in parsing start position: {:?}",
141 parts
142 ))
143 .into())
144 }
145 },
146 end: match parts[2].parse() {
147 Ok(end) => end,
148 Err(_err) => {
149 return Err(anyhow::Error::from(Error::other(format!(
150 "Error in parsing end position: {:?}",
151 parts
152 ))))
153 }
154 },
155 rest: Some(parts[3..].join("\t")).filter(|s| !s.is_empty()),
156 });
157 }
158 if new_regions.is_empty() {
159 let new_error = Error::other(format!(
160 "Corrupted file. 0 regions found in the file: {}",
161 path.display()
162 ));
163 return Err(new_error.into());
164 }
165
166 let mut rs = RegionSet {
167 regions: new_regions,
168 header: match header.is_empty() {
169 true => None,
170 false => Some(header),
171 },
172 path: Some(value.to_owned()),
173 };
174 rs.sort();
176
177 Ok(rs)
178 }
179}
180
181impl TryFrom<&str> for RegionSet {
182 type Error = anyhow::Error;
183
184 fn try_from(value: &str) -> Result<Self> {
185 RegionSet::try_from(Path::new(value))
186 }
187}
188
189impl TryFrom<String> for RegionSet {
190 type Error = anyhow::Error;
191
192 fn try_from(value: String) -> Result<Self> {
193 RegionSet::try_from(Path::new(&value))
195 }
196}
197
198impl TryFrom<PathBuf> for RegionSet {
199 type Error = anyhow::Error;
200
201 fn try_from(value: PathBuf) -> Result<Self> {
202 RegionSet::try_from(value.as_path())
203 }
204}
205
206impl From<Vec<Region>> for RegionSet {
207 fn from(regions: Vec<Region>) -> Self {
208 let path = None;
209
210 RegionSet {
211 regions,
212 header: None,
213 path,
214 }
215 }
216}
217
218impl From<&[u8]> for RegionSet {
219 fn from(value: &[u8]) -> Self {
220 let region_str = String::from_utf8_lossy(value);
221 let regions: Vec<Region> = region_str
222 .split('\n')
223 .map(|line| {
224 let parts = line.split('\t').collect::<Vec<&str>>();
225
226 let chr = parts[0].to_string();
227 let start = parts[1].parse::<u32>().unwrap();
228 let end = parts[2].parse::<u32>().unwrap();
229 let rest = Some(parts[3..].join("\t")).filter(|s| !s.is_empty());
230
231 Region {
232 chr,
233 start,
234 end,
235 rest,
236 }
237 })
238 .collect();
239
240 RegionSet {
241 regions,
242 header: None,
243 path: None,
244 }
245 }
246}
247
248impl<'a> Iterator for RegionSetIterator<'a> {
249 type Item = &'a Region;
250
251 fn next(&mut self) -> Option<Self::Item> {
252 if self.index < self.region_set.regions.len() {
253 let region = &self.region_set.regions[self.index];
254 self.index += 1;
255 Some(region)
256 } else {
257 None
258 }
259 }
260}
261
262impl<'a> IntoIterator for &'a RegionSet {
263 type Item = &'a Region;
264 type IntoIter = RegionSetIterator<'a>;
265
266 fn into_iter(self) -> Self::IntoIter {
267 RegionSetIterator {
268 region_set: self,
269 index: 0,
270 }
271 }
272}
273
274impl RegionSet {
275 pub fn to_bed<T: AsRef<Path>>(&self, path: T) -> std::io::Result<()> {
281 let path = path.as_ref();
282 if path.exists() {
283 println!("Bed file already exists. Overwriting existing file")
284 }
285
286 if let Some(parent) = path.parent() {
287 std::fs::create_dir_all(parent)?;
288 }
289
290 let mut file = File::create(path).unwrap();
291
292 for region in &self.regions {
293 writeln!(file, "{}", region.as_string())?;
294 }
295 Ok(())
296 }
297
298 pub fn to_bed_gz<T: AsRef<Path>>(&self, path: T) -> std::io::Result<()> {
304 let path = path.as_ref();
305 if path.exists() {
306 println!("Bed file already exists. Overwriting existing file")
307 }
308
309 if let Some(parent) = path.parent() {
310 std::fs::create_dir_all(parent)?;
311 }
312
313 let file = File::create(path)?;
314 let mut buffer: String = String::new();
315
316 for region in &self.regions {
317 buffer.push_str(&format!("{}\n", region.as_string(),));
318 }
319
320 let mut encoder = GzEncoder::new(BufWriter::new(file), Compression::best());
321 encoder.write_all(buffer.as_bytes())?;
322
323 Ok(())
324 }
325
326 pub fn identifier(&self) -> String {
335 let mut chrs = String::new();
336 let mut starts = String::new();
337 let mut ends = String::new();
338
339 let mut first = true;
340 for region in &self.regions {
341 if !first {
342 chrs.push(',');
343 starts.push(',');
344 ends.push(',');
345 }
346 chrs.push_str(®ion.chr);
347 starts.push_str(®ion.start.to_string());
348 ends.push_str(®ion.end.to_string());
349
350 first = false;
351 }
352
353 let mut hasher = Md5::new();
356 hasher.update(chrs);
357 let chrom_hash = hasher.finalize();
358
359 let mut hasher = Md5::new();
360 hasher.update(starts);
361 let start_hash = hasher.finalize();
362
363 let mut hasher = Md5::new();
364 hasher.update(ends);
365 let end_hash = hasher.finalize();
366
367 let combined = format!("{:x},{:x},{:x}", chrom_hash, start_hash, end_hash);
368
369 let mut hasher = Md5::new();
370 hasher.update(combined);
371 let hash = hasher.finalize();
372 let bed_digest: String = format!("{:x}", hash);
373
374 bed_digest
375 }
376
377 pub fn file_digest(&self) -> String {
378 let mut buffer: String = String::new();
379 for region in &self.regions {
380 buffer.push_str(&format!("{}\n", region.as_string(),));
381 }
382
383 let mut hasher = Md5::new();
384
385 hasher.update(buffer);
386 let hash = hasher.finalize();
387 let file_digest: String = format!("{:x}", hash);
388
389 file_digest
390 }
391
392 pub fn iter_chroms(&self) -> impl Iterator<Item = &String> {
396 let unique_chroms: HashSet<&String> = self.regions.iter().map(|r| &r.chr).collect();
397 unique_chroms.into_iter()
398 }
399
400 pub fn iter_chr_regions<'a>(&'a self, chr: &'a str) -> impl Iterator<Item = &'a Region> {
407 self.regions.iter().filter(move |r| r.chr == chr)
408 }
409
410 #[cfg(feature = "bigbed")]
418 pub fn to_bigbed<T: AsRef<Path>>(&self, out_path: T, chrom_size: T) -> Result<()> {
419 let out_path = out_path.as_ref();
420
421 if out_path.exists() {
422 println!("Bed file already exists. Overwriting existing file")
423 }
424
425 if let Some(parent) = out_path.parent() {
426 std::fs::create_dir_all(parent)?;
427 }
428 let chrom_sizes: HashMap<String, u32> = get_chrom_sizes(chrom_size);
429
430 let mut warnings_count: i32 = 0;
431 let region_vector = self.regions.iter().map(|i| {
432 if !chrom_sizes.contains_key(&i.chr) {
434 eprintln!(
435 "Warning:: Chromosome is not found in Chrom sizes file. Chr: '{}'",
436 i.chr
437 );
438 if warnings_count > 40 {
439 panic!("Incorrect chrom sizes provided. Unable to create bigBed file!");
440 }
441
442 warnings_count += 1;
443 return None;
444 }
445 Some(Ok::<_, Error>((
446 i.chr.clone(),
447 BedEntry {
448 start: i.start,
449 end: i.end,
450 rest: i
451 .rest
452 .as_deref()
453 .map_or(String::new(), |s| format!("\t{}", s)),
454 },
455 )))
456 });
457
458 #[allow(clippy::option_filter_map)]
459 let region_vector = region_vector.filter(|e| e.is_some()).map(|e| e.unwrap());
461
462 let runtime = runtime::Builder::new_multi_thread()
463 .worker_threads(
464 std::thread::available_parallelism()
465 .map(|c| c.into())
466 .unwrap_or(1),
467 )
468 .build()
469 .expect("Unable to create thread pool.");
470
471 let mut bb_out = BigBedWrite::create_file(out_path, chrom_sizes.clone())
472 .expect("Failed to create bigBed file.");
473
474 bb_out.options.max_zooms = 8;
475
476 let data = BedParserStreamingIterator::wrap_iter(region_vector.into_iter(), true);
477 bb_out.write(data, runtime)?;
478 Ok(())
479 }
480
481 pub fn sort(&mut self) {
487 self.regions
488 .sort_by(|a, b| a.chr.cmp(&b.chr).then_with(|| a.start.cmp(&b.start)));
489 }
490
491 pub fn is_empty(&self) -> bool {
495 if self.regions.len() == 0 {
496 return true;
497 }
498 false
499 }
500
501 pub fn region_widths(&self) -> Result<Vec<u32>> {
505 let mut widths: Vec<u32> = Vec::new();
506
507 for region in &self.regions {
508 widths.push(region.width())
509 }
510
511 Ok(widths)
512 }
513
514 pub fn mean_region_width(&self) -> f64 {
518 let sum: u32 = self
519 .regions
520 .iter()
521 .map(|region| region.end - region.start)
522 .sum();
523 let count: u32 = self.regions.len() as u32;
524
525 ((sum as f64 / count as f64) * 100.0).round() / 100.0
527 }
528
529 pub fn calc_mid_points(&self) -> HashMap<String, Vec<u32>> {
533 let mut mid_points: HashMap<String, Vec<u32>> = HashMap::new();
534 for chromosome in self.iter_chroms() {
535 let mut chr_mid_points: Vec<u32> = Vec::new();
536 for region in self.iter_chr_regions(chromosome) {
537 chr_mid_points.push(region.mid_point());
538 }
539 mid_points.insert(chromosome.clone(), chr_mid_points);
540 }
541 mid_points
542 }
543
544 pub fn len(&self) -> usize {
550 self.regions.len()
551 }
552
553 pub fn get_max_end_per_chr(&self) -> HashMap<String, u32> {
557 let mut result: HashMap<String, u32> = HashMap::new();
558
559 let mut current_chr: &String = &self.regions[0].chr;
560 let mut max_end: u32 = self.regions[0].end;
561
562 for r in &self.regions[1..] {
563 if &r.chr == current_chr {
564 max_end = max_end.max(r.end);
566 } else {
567 result.insert(current_chr.clone(), max_end);
569 current_chr = &r.chr;
570 max_end = r.end;
571 }
572 }
573
574 result.insert(current_chr.clone(), max_end);
576
577 result
578 }
579
580 pub fn nucleotides_length(&self) -> u32 {
584 let mut total_count: u32 = 0;
585 for r in &self.regions {
586 total_count += r.width();
587 }
588 total_count
589 }
590
591 #[cfg(feature = "dataframe")]
595 pub fn to_polars(&self) -> PolarsResult<DataFrame> {
596 let data: String = self
598 .regions
599 .iter()
600 .map(|region| {
601 if let Some(rest) = region.rest.as_deref() {
602 format!("{}\t{}\t{}\t{}", region.chr, region.start, region.end, rest,)
603 } else {
604 format!("{}\t{}\t{}", region.chr, region.start, region.end,)
605 }
606 })
607 .collect::<Vec<_>>()
608 .join("\n");
609
610 let cursor = Cursor::new(data);
611
612 let df = CsvReadOptions::default()
613 .with_has_header(false)
614 .map_parse_options(|parse_options| parse_options.with_separator(b'\t'))
615 .with_infer_schema_length(Some(10000))
616 .into_reader_with_file_handle(cursor)
617 .finish()?;
618
619 Ok(df)
620 }
621}
622
623impl Display for RegionSet {
624 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625 write!(f, "RegionSet with {} regions.", self.len())
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632
633 use pretty_assertions::assert_eq;
634 use rstest::*;
635
636 fn get_test_path(file_name: &str) -> Result<PathBuf, Error> {
637 let file_path: PathBuf = std::env::current_dir()
638 .unwrap()
639 .join("../tests/data/regionset")
640 .join(file_name);
641 Ok(file_path)
642 }
643
644 #[rstest]
645 fn test_open_from_path() {
646 let file_path = get_test_path("dummy.narrowPeak").unwrap();
647 assert!(RegionSet::try_from(file_path.as_path()).is_ok());
648 }
649
650 #[rstest]
651 fn test_open_from_string() {
652 let file_path = get_test_path("dummy.narrowPeak").unwrap();
653 assert!(RegionSet::try_from(file_path.to_str().unwrap()).is_ok());
654 }
655
656 #[rstest]
657 fn test_open_from_url() {
658 let file_path = String::from(
659 "https://www.encodeproject.org/files/ENCFF321QPN/@@download/ENCFF321QPN.bed.gz",
660 );
661 assert!(RegionSet::try_from(file_path).is_ok());
662 }
663
664 #[rstest]
665 #[ignore = "Avoid BEDbase dependency in CI"]
666 fn test_open_from_bedbase() {
667 let bbid = String::from("6b2e163a1d4319d99bd465c6c78a9741");
668 let region_set = RegionSet::try_from(bbid);
669 assert_eq!(region_set.is_ok(), true);
670 assert_eq!(
671 region_set.unwrap().identifier(),
672 "6b2e163a1d4319d99bd465c6c78a9741"
673 );
674 }
675
676 #[rstest]
677 fn test_open_bed_gz() {
678 let file_path = get_test_path("dummy.narrowPeak.bed.gz").unwrap();
679 assert!(RegionSet::try_from(file_path.to_str().unwrap()).is_ok());
680 }
681
682 #[rstest]
683 fn test_calculate_identifier() {
684 let file_path = get_test_path("dummy.narrowPeak.bed.gz").unwrap();
685 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
686
687 assert_eq!("f0b2cf73383b53bd97ff525a0380f200", region_set.identifier());
688 }
689
690 #[rstest]
691 fn test_save_bed_gz() {
692 let file_path = get_test_path("dummy.narrowPeak.bed.gz").unwrap();
693 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
694
695 let tempdir = tempfile::tempdir().unwrap();
696
697 let mut new_file_path = tempdir.keep();
698 new_file_path.push("new_file.bed.gz");
699
700 assert!(region_set.to_bed_gz(new_file_path.as_path()).is_ok());
701
702 let new_region = RegionSet::try_from(new_file_path.as_path());
703 assert!(new_region.is_ok());
704 assert_eq!(new_region.unwrap().identifier(), region_set.identifier())
705 }
706
707 #[rstest]
708 fn test_save_bed() {
709 let file_path = get_test_path("dummy.narrowPeak").unwrap();
710 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
711
712 let tempdir = tempfile::tempdir().unwrap();
713
714 let mut new_file_path = tempdir.keep();
715 new_file_path.push("new_bedfile.bed");
716
717 assert!(region_set.to_bed(new_file_path.as_path()).is_ok());
718
719 let new_region = RegionSet::try_from(new_file_path.as_path());
720 assert!(new_region.is_ok());
721 assert_eq!(new_region.unwrap().identifier(), region_set.identifier())
722 }
723
724 #[cfg(feature = "bigbed")]
725 #[rstest]
726 fn test_save_bigbed() {
727 let file_path = get_test_path("dummy.narrowPeak").unwrap();
728 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
729
730 let chrom_sizes_path: PathBuf = std::env::current_dir()
731 .unwrap()
732 .join("../tests/data/regionset/dummy_chrom_sizes");
733
734 let tempdir = tempfile::tempdir().unwrap();
735 let mut new_file_path = tempdir.keep();
736 new_file_path.push("new.bigbed");
737
738 assert!(region_set
739 .to_bigbed(new_file_path.as_path(), chrom_sizes_path.as_path())
740 .is_ok());
741 }
742
743 #[rstest]
744 fn test_read_headers() {
745 let file_path = get_test_path("dummy_headers.bed").unwrap();
746 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
747
748 assert!(region_set.header.is_some());
749 assert_eq!(region_set.path.unwrap(), file_path);
750 }
751
752 #[rstest]
753 fn test_is_empty() {
754 let file_path = get_test_path("dummy_headers.bed").unwrap();
755 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
756
757 assert!(!region_set.is_empty());
758 }
759
760 #[rstest]
761 fn test_file_digest() {
762 let file_path = get_test_path("dummy.narrowPeak").unwrap();
763 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
764
765 assert_eq!(region_set.file_digest(), "6224c4d40832b3e0889250f061e01120");
766 assert_eq!(region_set.identifier(), "f0b2cf73383b53bd97ff525a0380f200")
767 }
768
769 #[rstest]
770 fn test_mean_region_width() {
771 let file_path = get_test_path("dummy.narrowPeak").unwrap();
772 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
773
774 assert_eq!(region_set.mean_region_width(), 4.22)
775 }
776 #[rstest]
777 fn test_open_file_with_incorrect_headers() {
778 let file_path = get_test_path("dummy_incorrect_headers.bed").unwrap();
779 let _region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
780 }
781
782 #[rstest]
783 fn test_chr_length() {
784 let file_path = get_test_path("dummy.narrowPeak").unwrap();
785 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
786 assert_eq!(*region_set.get_max_end_per_chr().get("chr1").unwrap(), 36);
787 assert_eq!(region_set.get_max_end_per_chr().len(), 1)
788 }
789
790 #[rstest]
791 fn test_total_nucleotides_function() {
792 let file_path = get_test_path("dummy.narrowPeak").unwrap();
793 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
794
795 assert_eq!(region_set.nucleotides_length(), 38)
796 }
797
798 #[rstest]
799 fn test_iter_chroms() {
800 let file_path = get_test_path("dummy.narrowPeak").unwrap();
801 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
802
803 assert_eq!(region_set.iter_chroms().collect::<Vec<_>>().len(), 1)
804 }
805
806 #[cfg(feature = "dataframe")]
807 #[rstest]
808 fn test_polars() {
809 let file_path = get_test_path("dummy.narrowPeak").unwrap();
810 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
811 let rs_polars = region_set.to_polars().unwrap();
812 println!("Number of columns: {:?}", rs_polars.get_columns().len());
813 assert_eq!(rs_polars.get_columns().len(), 10);
814 }
815
816 #[rstest]
817 fn test_calc_mid_points() {
818 let file_path = get_test_path("dummy.narrowPeak").unwrap();
819 let region_set = RegionSet::try_from(file_path.to_str().unwrap()).unwrap();
820
821 let mid_points = region_set.calc_mid_points();
822 assert_eq!(mid_points.get("chr1").unwrap().len(), 9);
823 assert_eq!(mid_points.len(), 1);
824 assert_eq!(
825 mid_points
826 .get("chr1")
827 .unwrap()
828 .iter()
829 .min()
830 .copied()
831 .unwrap(),
832 6u32
833 );
834 }
835}