use clap::Parser;
use csv::{Writer, WriterBuilder};
use std::{
collections::HashMap,
fs::File,
io::{self, Write},
path::PathBuf,
};
use crate::{
data::{operations::FloatOperation, SerializableDatumType},
io::{
parsers::{Bed5Iterator, GenomicRangesParser},
tsv::BED_TSV,
TsvConfig,
},
merging_iterators::{MergingEmptyResultIterator, MergingResultIterator},
prelude::*,
ranges::{operations::adjust_range, GenomicRangeRecord, GenomicRangeRecordEmpty},
reporting::{CommandOutput, Report},
test_utilities::{random_granges, random_granges_mock_bed5},
unique_id::UniqueIdentifier,
Position, PositionOffset,
};
pub fn build_tsv_writer(
output: Option<impl Into<PathBuf>>,
) -> Result<Writer<Box<dyn Write>>, GRangesError> {
let output = output.map(|path| path.into());
let writer_boxed: Box<dyn io::Write> = match &output {
Some(path) => Box::new(File::create(path)?),
None => Box::new(io::stdout()),
};
let writer = WriterBuilder::new()
.delimiter(b'\t')
.has_headers(false)
.from_writer(writer_boxed);
Ok(writer)
}
pub fn build_tsv_writer_with_config(
output: Option<impl Into<PathBuf>>,
config: &TsvConfig,
) -> Result<Writer<Box<dyn Write>>, GRangesError> {
let output = output.map(|path| path.into());
let mut writer_boxed: Box<dyn io::Write> = match &output {
Some(path) => Box::new(File::create(path)?),
None => Box::new(io::stdout()),
};
if let Some(metadata_rows) = &config.metadata {
for metadata_row in metadata_rows {
writeln!(writer_boxed, "#{}", metadata_row)?;
}
}
if let Some(headers) = &config.headers {
writeln!(writer_boxed, "{}", headers.join("\t"))?;
}
let writer = WriterBuilder::new()
.delimiter(b'\t')
.has_headers(false)
.from_writer(writer_boxed);
Ok(writer)
}
#[derive(Clone)]
pub enum ProcessingMode {
Streaming,
InMemory,
}
pub fn granges_adjust(
bedfile: &PathBuf,
seqlens: &PathBuf,
both: PositionOffset,
output: Option<&PathBuf>,
sort: bool,
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let mut writer = build_tsv_writer(output)?;
let mut report = Report::new();
let mut skipped_ranges = 0;
if !sort {
let bedlike_iterator = BedlikeIterator::new(bedfile)?;
for record in bedlike_iterator {
let range = record?;
let seqname = &range.seqname;
let length = *genome
.get(seqname)
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;
let possibly_adjusted_range = adjust_range(range, -both, both, length);
if let Some(range_adjusted) = possibly_adjusted_range {
writer.serialize(range_adjusted)?;
} else {
skipped_ranges += 1;
}
if skipped_ranges > 0 {
report.add_issue(format!(
"{} ranges were removed because their widths after adjustment were ≤ 0",
skipped_ranges
))
}
}
} else {
let ranges_iter = GenomicRangesFile::parsing_iterator(bedfile)?;
match ranges_iter {
GenomicRangesParser::Bed3(iter) => {
let gr = GRangesEmpty::from_iter(iter, &genome)?;
gr.adjust_ranges(-both, both)
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Bed4(iter) => {
let gr = GRanges::from_iter(iter, &genome)?;
gr.adjust_ranges(-both, both)
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Bed5(iter) => {
let gr = GRanges::from_iter(iter, &genome)?;
gr.adjust_ranges(-both, both)
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Bedlike(iter) => {
let gr = GRanges::from_iter(iter.try_unwrap_data(), &genome)?;
gr.adjust_ranges(-both, both)
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Unsupported => {
return Err(GRangesError::UnsupportedGenomicRangesFileFormat)
}
}
}
Ok(CommandOutput::new((), Some(report)))
}
pub fn granges_filter(
seqlens: &PathBuf,
left_path: &PathBuf,
right_path: &PathBuf,
output: Option<&PathBuf>,
skip_missing: bool,
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let seqnames: Vec<String> = genome.keys().cloned().collect();
let left_iter = GenomicRangesFile::parsing_iterator(left_path)?;
let right_iter = GenomicRangesFile::parsing_iterator(right_path)?;
match (left_iter, right_iter) {
(GenomicRangesParser::Bed3(left), GenomicRangesParser::Bed3(right)) => {
let left_gr;
let right_gr;
if skip_missing {
left_gr = GRangesEmpty::from_iter(left.retain_seqnames(&seqnames), &genome)?;
right_gr = GRangesEmpty::from_iter(right.retain_seqnames(&seqnames), &genome)?;
} else {
left_gr = GRangesEmpty::from_iter(left, &genome)?;
right_gr = GRangesEmpty::from_iter(right, &genome)?;
}
let right_gr = right_gr.into_coitrees()?;
let semijoin = left_gr.filter_overlaps(&right_gr)?;
semijoin.write_to_tsv(output, &BED_TSV)?;
Ok(CommandOutput::new((), None))
}
(GenomicRangesParser::Bed3(left), GenomicRangesParser::Bedlike(right)) => {
let left_gr;
let right_gr;
if skip_missing {
left_gr = GRangesEmpty::from_iter(left.retain_seqnames(&seqnames), &genome)?;
right_gr = GRanges::from_iter(
right.try_unwrap_data().retain_seqnames(&seqnames),
&genome,
)?;
} else {
left_gr = GRangesEmpty::from_iter(left, &genome)?;
right_gr = GRanges::from_iter(right.try_unwrap_data(), &genome)?;
}
let right_gr = right_gr.into_coitrees()?;
let semijoin = left_gr.filter_overlaps(&right_gr)?;
semijoin.write_to_tsv(output, &BED_TSV)?;
Ok(CommandOutput::new((), None))
}
(GenomicRangesParser::Bedlike(left), GenomicRangesParser::Bed3(right)) => {
let left_gr;
let right_gr;
if skip_missing {
left_gr =
GRanges::from_iter(left.try_unwrap_data().retain_seqnames(&seqnames), &genome)?;
right_gr = GRangesEmpty::from_iter(right.retain_seqnames(&seqnames), &genome)?;
} else {
left_gr = GRanges::from_iter(left.try_unwrap_data(), &genome)?;
right_gr = GRangesEmpty::from_iter(right, &genome)?;
}
let right_gr = right_gr.into_coitrees()?;
let semijoin = left_gr.filter_overlaps(&right_gr)?;
semijoin.write_to_tsv(output, &BED_TSV)?;
Ok(CommandOutput::new((), None))
}
(GenomicRangesParser::Bedlike(left), GenomicRangesParser::Bedlike(right)) => {
let left_gr;
let right_gr;
if skip_missing {
left_gr =
GRanges::from_iter(left.try_unwrap_data().retain_seqnames(&seqnames), &genome)?;
right_gr = GRanges::from_iter(
right.try_unwrap_data().retain_seqnames(&seqnames),
&genome,
)?;
} else {
left_gr = GRanges::from_iter(left.try_unwrap_data(), &genome)?;
right_gr = GRanges::from_iter(right.try_unwrap_data(), &genome)?;
}
let right_gr = right_gr.into_coitrees()?;
let intersection = left_gr.filter_overlaps(&right_gr)?;
intersection.write_to_tsv(output, &BED_TSV)?;
Ok(CommandOutput::new((), None))
}
_ => Err(GRangesError::UnsupportedGenomicRangesFileFormat),
}
}
pub fn granges_flank(
seqlens: &PathBuf,
bedfile: &PathBuf,
left: Option<Position>,
right: Option<Position>,
output: Option<&PathBuf>,
skip_missing: bool,
mode: ProcessingMode,
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let seqnames: Vec<String> = genome.keys().cloned().collect();
let ranges_iter = GenomicRangesFile::parsing_iterator(bedfile)?;
match mode {
ProcessingMode::InMemory => match ranges_iter {
GenomicRangesParser::Bed3(iter) => {
let gr = if skip_missing {
GRangesEmpty::from_iter(iter.retain_seqnames(&seqnames), &genome)?
} else {
GRangesEmpty::from_iter(iter, &genome)?
};
gr.flanking_ranges(left, right)?
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Bed4(iter) => {
let gr = if skip_missing {
GRanges::from_iter(iter.retain_seqnames(&seqnames), &genome)?
} else {
GRanges::from_iter(iter, &genome)?
};
gr.flanking_ranges(left, right)?
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Bed5(_iter) => {
unimplemented!()
}
GenomicRangesParser::Bedlike(iter) => {
let gr = if skip_missing {
GRanges::from_iter(iter.try_unwrap_data().retain_seqnames(&seqnames), &genome)?
} else {
GRanges::from_iter(iter.try_unwrap_data(), &genome)?
};
gr.flanking_ranges(left, right)?
.write_to_tsv(output, &BED_TSV)?
}
GenomicRangesParser::Unsupported => {
return Err(GRangesError::UnsupportedGenomicRangesFileFormat)
}
},
ProcessingMode::Streaming => {
let mut writer = build_tsv_writer(output)?;
match ranges_iter {
GenomicRangesParser::Bed3(iter) => {
if skip_missing {
for record in iter.retain_seqnames(&seqnames) {
let range = record?;
let seqname = &range.seqname;
let length = *genome
.get(seqname)
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;
let flanking_ranges = range
.flanking_ranges::<GenomicRangeRecord<String>>(left, right, length);
for flanking_range in flanking_ranges {
writer.serialize(flanking_range)?;
}
}
} else {
for record in iter {
let range = record?;
let seqname = &range.seqname;
let length = *genome
.get(seqname)
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;
let flanking_ranges = range
.flanking_ranges::<GenomicRangeRecordEmpty>(left, right, length);
for flanking_range in flanking_ranges {
writer.serialize(flanking_range)?;
}
}
}
}
GenomicRangesParser::Bed4(_iter) => {
unimplemented!()
}
GenomicRangesParser::Bed5(_iter) => {
unimplemented!()
}
GenomicRangesParser::Bedlike(iter) => {
if skip_missing {
for record in iter.retain_seqnames(&seqnames) {
let range = record?;
let seqname = &range.seqname;
let length = *genome
.get(seqname)
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;
let flanking_ranges = range
.flanking_ranges::<GenomicRangeRecord<String>>(left, right, length);
for flanking_range in flanking_ranges {
writer.serialize(flanking_range)?;
}
}
} else {
for record in iter {
let range = record?;
let seqname = &range.seqname;
let length = *genome
.get(seqname)
.ok_or(GRangesError::MissingSequence(seqname.to_string()))?;
let flanking_ranges = range
.flanking_ranges::<GenomicRangeRecordEmpty>(left, right, length);
for flanking_range in flanking_ranges {
writer.serialize(flanking_range)?;
}
}
}
}
GenomicRangesParser::Unsupported => {
return Err(GRangesError::UnsupportedGenomicRangesFileFormat)
}
}
}
}
Ok(CommandOutput::new((), None))
}
pub fn granges_map(
seqlens: impl Into<PathBuf>,
left_path: &PathBuf,
right_path: &PathBuf,
operations: Vec<FloatOperation>,
output: Option<&PathBuf>,
skip_missing: bool,
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
let seqnames: Vec<String> = genome.keys().cloned().collect();
let left_iter = Bed3Iterator::new(left_path)?;
let right_iter = Bed5Iterator::new(right_path)?;
let left_gr;
let right_gr;
if skip_missing {
left_gr = GRangesEmpty::from_iter(left_iter.retain_seqnames(&seqnames), &genome)?;
right_gr = GRanges::from_iter(right_iter.retain_seqnames(&seqnames), &genome)?;
} else {
left_gr = GRangesEmpty::from_iter(left_iter, &genome)?;
right_gr = GRanges::from_iter(right_iter, &genome)?;
}
if left_gr.is_empty() {
return Err(GRangesError::NoRows);
}
if right_gr.is_empty() {
return Err(GRangesError::NoRows);
}
let right_gr = {
right_gr
.into_coitrees()?
.map_data(|bed5_cols| {
bed5_cols.score
})?
};
let left_join_gr = left_gr.left_overlaps(&right_gr)?;
let result_gr = left_join_gr.map_joins(|join_data| {
let mut overlap_scores: Vec<f64> = join_data
.right_data
.into_iter()
.flatten()
.collect();
operations
.iter()
.map(|operation| {
operation
.run(&mut overlap_scores)
.into_serializable(&BED_TSV)
})
.collect::<Vec<SerializableDatumType>>()
})?;
result_gr.write_to_tsv(output, &BED_TSV)?;
Ok(CommandOutput::new((), None))
}
pub fn granges_windows(
seqlens: impl Into<PathBuf>,
width: Position,
step: Option<Position>,
chop: bool,
output: Option<impl Into<PathBuf>>,
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
GRangesEmpty::from_windows(&genome, width, step, chop)?.write_to_tsv(output, &BED_TSV)?;
Ok(CommandOutput::new((), None))
}
pub fn granges_random_bed(
seqlens: impl Into<PathBuf>,
num: usize,
output: Option<impl Into<PathBuf>>,
sort: bool,
bed5: bool,
) -> Result<CommandOutput<()>, GRangesError> {
let genome = read_seqlens(seqlens)?;
if bed5 {
let mut gr = random_granges_mock_bed5(&genome, num)?;
if sort {
gr = gr.sort()
}
gr.write_to_tsv(output, &BED_TSV)?;
} else {
let mut gr = random_granges(&genome, num)?;
if sort {
gr = gr.sort();
}
gr.write_to_tsv(output, &BED_TSV)?;
};
Ok(CommandOutput::new((), None))
}
#[derive(Parser)]
pub struct Merge {
#[arg(short, long, required = true)]
bedfile: PathBuf,
#[clap(short, long, default_value_t = 0)]
distance: PositionOffset,
#[clap(short, long, value_parser = clap::value_parser!(FloatOperation))]
func: Option<FloatOperation>,
#[arg(short, long)]
output: Option<PathBuf>,
}
impl Merge {
pub fn run(&self) -> Result<CommandOutput<()>, GRangesError> {
let bedfile = &self.bedfile;
let distance = &self.distance;
let ranges_iter = GenomicRangesFile::parsing_iterator(bedfile)?;
let func = &self.func;
let mut writer = build_tsv_writer(self.output.as_ref())?;
match ranges_iter {
GenomicRangesParser::Bed3(iter) => {
let merging_iter = MergingEmptyResultIterator::new(iter, *distance);
for result in merging_iter {
let record = result?;
writer.serialize(record)?;
}
Ok(CommandOutput::new((), None))
}
GenomicRangesParser::Bed4(iter) => {
let merging_iter = MergingResultIterator::new(iter, *distance, |data| {
data.into_iter()
.map(|x| x.name)
.collect::<Vec<_>>()
.join(",")
});
for result in merging_iter {
let record = result?;
writer.serialize(record)?;
}
Ok(CommandOutput::new((), None))
}
GenomicRangesParser::Bed5(iter) => {
let merging_iter = MergingResultIterator::new(iter, *distance, |data| {
let mut scores: Vec<f64> = data
.into_iter()
.filter_map(|bed5_cols| bed5_cols.score)
.collect();
func.as_ref().unwrap().run(&mut scores)
});
for result in merging_iter {
let record = result?;
writer.serialize(record)?;
}
Ok(CommandOutput::new((), None))
}
GenomicRangesParser::Bedlike(_iter) => {
todo!()
}
GenomicRangesParser::Unsupported => {
Err(GRangesError::UnsupportedGenomicRangesFileFormat)
}
}
}
}
#[derive(Parser)]
pub struct FilterChroms {
#[arg(short, long, required = true)]
genome: PathBuf,
#[arg(short, long, required = true)]
bedfile: PathBuf,
#[arg(short, long)]
output: Option<PathBuf>,
}
impl FilterChroms {
pub fn run(&self) -> Result<CommandOutput<()>, GRangesError> {
let bedfile = &self.bedfile;
let genome = read_seqlens(&self.genome)?;
let bedlike_iterator = BedlikeIterator::new(bedfile)?;
let mut writer = build_tsv_writer(self.output.as_ref())?;
for record in bedlike_iterator {
let range = record?;
let seqname = &range.seqname;
let passes_filter = genome.contains_key(seqname);
if passes_filter {
writer.serialize(range)?;
}
}
Ok(CommandOutput::new((), None))
}
}
fn transpose<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
assert!(!v.is_empty());
let len = v[0].len();
let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect();
(0..len)
.map(|_| {
iters
.iter_mut()
.map(|n| n.next().unwrap())
.collect::<Vec<T>>()
})
.collect()
}
#[derive(Parser)]
pub struct FeatureDensity {
#[arg(short, long, required = true)]
genome: PathBuf,
#[arg(short, long, required = true)]
bedfile: PathBuf,
#[arg(short, long)]
width: Position,
#[arg(short, long)]
step: Option<Position>,
#[arg(short, long)]
chop: bool,
#[arg(short, long)]
exclusive: bool,
#[arg(short = 'l', long)]
headers: bool,
#[arg(short, long)]
output: Option<PathBuf>,
}
type GRangesFeatureMatrix = GRanges<VecRangesIndexed, Vec<Vec<Position>>>;
impl FeatureDensity {
pub fn feature_density(&self) -> Result<(GRangesFeatureMatrix, Vec<String>), GRangesError> {
let bedfile = &self.bedfile;
let genome = read_seqlens(&self.genome)?;
let bed4_iter = Bed4Iterator::new(bedfile)?;
let mut records_by_features: HashMap<String, Vec<GenomicRangeRecordEmpty>> =
HashMap::new();
for result in bed4_iter {
let range = result?;
let feature = &range.data.name;
let vec = records_by_features.entry(feature.to_string()).or_default();
vec.push(range.into_empty()) }
let mut gr_by_features: HashMap<String, GRangesEmpty<COITreesEmpty>> = HashMap::new();
for (feature, ranges) in records_by_features.into_iter() {
let merging_iter = MergingEmptyIterator::new(ranges, 0);
let gr = GRangesEmpty::from_iter_ok(merging_iter, &genome)?.into_coitrees()?;
assert!(!gr_by_features.contains_key(&feature));
gr_by_features.insert(feature, gr);
}
let windows = GRangesEmpty::from_windows(&genome, self.width, self.step, self.chop)?;
let features: Vec<_> = gr_by_features.keys().cloned().collect();
let mut feature_matrix = Vec::new();
for (_feature, gr) in gr_by_features.into_iter() {
let window_counts = windows
.clone()
.left_overlaps(&gr)?
.map_joins(|joins| {
let total_overlaps: Position = joins.join.overlaps().iter().cloned().sum();
total_overlaps
})?
.take_data()?;
feature_matrix.push(window_counts);
}
let feature_matrix = transpose(feature_matrix);
let windows = GRangesEmpty::from_windows(&genome, self.width, self.step, self.chop)?;
let windows = windows.into_granges_data(feature_matrix)?;
Ok((windows, features))
}
pub fn feature_density_exclusive(&self) -> Result<(GRangesFeatureMatrix, Vec<String>), GRangesError> {
let bedfile = &self.bedfile;
let genome = read_seqlens(&self.genome)?;
let bed4_iter = Bed4Iterator::new(bedfile)?;
let mut gr = GRanges::new_vec_keyed(&genome);
for result in bed4_iter {
let range = result?;
gr.push_range_with_key(&range.seqname, range.start, range.end, &range.data.name)?;
}
let gr = gr.into_coitrees()?;
let feature_map = gr.data().ok_or(GRangesError::NoDataContainer)?.clone();
let windows = GRangesEmpty::from_windows(&genome, self.width, self.step, self.chop)?;
let windows_overlaps = windows.left_overlaps(&gr)?.map_joins(|mut join| {
let ranges = join.join.reduce_ranges();
let mut feature_overlaps: HashMap<Vec<usize>, _> = HashMap::new();
for range in ranges.iter() {
let mut key: Vec<usize> = range.indices().iter().map(|x| x.unwrap()).collect();
key.sort(); *feature_overlaps.entry(key).or_insert(0) += range.width();
}
feature_overlaps
})?;
let mut observed_sets = UniqueIdentifier::new();
let data = windows_overlaps
.data()
.ok_or(GRangesError::NoDataContainer)?;
for feature_counts in data.iter() {
for feature_set in feature_counts.keys() {
observed_sets.get_or_insert(feature_set);
}
}
let nsets = observed_sets.len();
let window_counts = windows_overlaps.map_data(|feature_counts| {
let mut counts = vec![0; nsets];
for (i, feature_set) in observed_sets.keys().enumerate() {
counts[i] = *feature_counts.get(feature_set).unwrap_or(&0);
}
counts
})?;
let matrix = window_counts.data().ok_or(GRangesError::NoDataContainer)?;
let col_totals = column_totals(matrix);
let sorted_indices = sorted_indices_by_values(&col_totals);
let window_counts = window_counts.map_data(|row| {
let row: Vec<_> = sorted_indices.iter().map(|i| row[*i]).collect();
row
})?;
let feature_sets: Vec<String> = observed_sets
.keys()
.map(|indices_key| {
let labels: Vec<_> = indices_key
.iter()
.map(|idx| feature_map.get_key(*idx).unwrap().clone())
.collect();
labels.join(",")
})
.collect();
let feature_sets: Vec<_> = sorted_indices
.iter()
.map(|i| feature_sets[*i].clone())
.collect();
Ok((window_counts, feature_sets))
}
pub fn run(&self) -> Result<CommandOutput<()>, GRangesError> {
if !self.exclusive {
let (window_counts, features) = self.feature_density()?;
if !self.headers {
window_counts.write_to_tsv(self.output.as_ref(), &BED_TSV)?;
} else {
let mut headers = vec!["chrom".to_string(), "start".to_string(), "end".to_string()];
headers.extend(features);
let config = TsvConfig {
no_value_string: "NA".to_string(),
headers: Some(headers),
metadata: None,
};
window_counts.write_to_tsv(self.output.as_ref(), &config)?;
}
} else {
let (window_counts, feature_sets) = self.feature_density_exclusive()?;
let mut headers = vec!["chrom".to_string(), "start".to_string(), "end".to_string()];
headers.extend(feature_sets);
let config = TsvConfig {
no_value_string: "NA".to_string(),
headers: Some(headers),
metadata: None,
};
window_counts.write_to_tsv(self.output.as_ref(), &config)?;
}
Ok(CommandOutput::new((), None))
}
}
fn column_totals(matrix: &Vec<Vec<Position>>) -> Vec<Position> {
if matrix.is_empty() || matrix[0].is_empty() {
return Vec::new();
}
let num_columns = matrix[0].len();
let mut totals = vec![0; num_columns];
for row in matrix {
for (i, &value) in row.iter().enumerate() {
if i < totals.len() {
totals[i] += value;
}
}
}
totals
}
fn sorted_indices_by_values(values: &[Position]) -> Vec<usize> {
let mut indices: Vec<usize> = (0..values.len()).collect();
indices.sort_by_key(|&i| std::cmp::Reverse(values[i]));
indices
}