use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fs::File;
use std::io::{self, BufRead, BufReader};
use clap::Parser;
use crossbeam_channel::unbounded;
use thiserror::Error;
use crate::utils::merge::merge_sections_many;
use crate::utils::reopen::ReopenableFile;
use crate::{BBIDataProcessor, BBIReadError, BigWigRead, BigWigWrite, ProcessDataError};
use crate::{BBIDataSource, BBIProcessError, Value};
use tokio::runtime::{self, Runtime};
use super::BBIWriteArgs;
#[derive(Clone, Debug, PartialEq, Parser)]
#[command(
name = "bigwigmerge",
about = "Merges multiple bigwigs.",
long_about = None,
)]
pub struct BigWigMergeArgs {
pub output: String,
#[arg(short = 'b')]
pub bigwig: Vec<String>,
#[arg(short = 'l')]
pub list: Vec<String>,
#[arg(long)]
#[arg(default_value_t = 0.0)]
pub threshold: f32,
#[arg(long)]
pub adjust: Option<f32>,
#[arg(long)]
pub clip: Option<f32>,
#[arg(long)]
#[arg(default_value_t = false)]
pub max: bool,
#[arg(long)]
pub output_type: Option<String>,
#[command(flatten)]
pub write_args: BBIWriteArgs,
}
pub fn bigwigmerge(args: BigWigMergeArgs) -> Result<(), Box<dyn Error>> {
let output = args.output;
let mut bigwigs: Vec<BigWigRead<ReopenableFile>> = vec![];
for name in args.bigwig {
match BigWigRead::open_file(&name) {
Ok(bw) => bigwigs.push(bw),
Err(e) => {
eprintln!("Error when opening bigwig ({}): {:?}", name, e);
return Ok(());
}
}
}
for list in args.list {
let list_file = match File::open(list) {
Ok(f) => f,
Err(e) => {
eprintln!("Couldn't open file: {:?}", e);
return Ok(());
}
};
let lines = BufReader::new(list_file).lines();
for line in lines {
let name = line?;
match BigWigRead::open_file(&name) {
Ok(bw) => bigwigs.push(bw),
Err(e) => {
eprintln!("Error when opening bigwig ({}): {:?}", name, e);
return Ok(());
}
}
}
}
let nthreads = args.write_args.nthreads;
let (iter, chrom_map) = get_merged_vals(bigwigs, 10, args.threshold, args.adjust, args.clip)?;
enum OutputType {
BigWig,
BedGraph,
}
let output_type = match (args.output_type, &output) {
(None, output)
if output.to_lowercase().ends_with(".bw")
|| output.to_lowercase().ends_with(".bigWig") =>
{
OutputType::BigWig
}
(None, output) if output.to_lowercase().ends_with(".bedGraph") => OutputType::BedGraph,
(Some(output_type), _) if output_type.to_lowercase() == "bigwig" => OutputType::BigWig,
(Some(output_type), _) if output_type.to_lowercase() == "bedgraph" => OutputType::BedGraph,
_ => {
eprintln!("Unable to determine output file format. \
The output file must either in with `.bw` or `.bigWig` for bigwigs or `.bedGraph` for bedGraphs; or \
`--output-type` must be set to either `bigwig` or `bedgraph`.");
return Ok(());
}
};
match output_type {
OutputType::BigWig => {
let outb = BigWigWrite::create_file(output, chrom_map)?;
let runtime = if nthreads == 1 {
runtime::Builder::new_current_thread().build().unwrap()
} else {
runtime::Builder::new_multi_thread()
.worker_threads(nthreads)
.build()
.unwrap()
};
let all_values = ChromGroupReadImpl {
iter: Box::new(iter),
};
outb.write(all_values, runtime)?;
}
OutputType::BedGraph => {
use std::io::Write;
let bedgraph = File::create(output)?;
let mut writer = io::BufWriter::new(bedgraph);
for v in iter {
let (chrom, _, mut values) = v?;
loop {
let val = match values.iter.next() {
Some(Ok(v)) => v,
Some(Err(e)) => Err(e)?,
None => break,
};
writer.write_fmt(format_args!(
"{}\t{}\t{}\t{}\n",
chrom, val.start, val.end, val.value
))?;
}
}
}
}
Ok(())
}
pub struct MergingValues {
iter: std::iter::Peekable<Box<dyn Iterator<Item = Result<Value, MergingValuesError>> + Send>>,
}
impl MergingValues {
pub fn new<I: 'static>(
iters: Vec<I>,
threshold: f32,
adjust: Option<f32>,
clip: Option<f32>,
) -> Self
where
I: Iterator<Item = Result<Value, MergingValuesError>> + Send,
{
let adjust = adjust.unwrap_or(0.0);
let iter: Box<dyn Iterator<Item = Result<Value, MergingValuesError>> + Send> = Box::new(
merge_sections_many(iters)
.map(move |x| {
x.map(|mut v| {
if let Some(clip) = clip {
v.value = clip.min(v.value);
}
v.value += adjust;
v
})
})
.filter(move |x| x.as_ref().map_or(true, |v| v.value > threshold)),
);
MergingValues {
iter: iter.peekable(),
}
}
}
#[derive(Error, Debug)]
pub enum MergingValuesError {
#[error("{}", .0)]
BBIReadError(#[from] BBIReadError),
#[error("{}", .0)]
MismatchedChroms(String),
#[error("{}", .0)]
Other(String),
#[error("{}", .0)]
IoError(#[from] io::Error),
}
pub fn get_merged_vals(
bigwigs: Vec<BigWigRead<ReopenableFile>>,
max_zooms: usize,
threshold: f32,
adjust: Option<f32>,
clip: Option<f32>,
) -> Result<
(
impl Iterator<Item = Result<(String, u32, MergingValues), MergingValuesError>>,
HashMap<String, u32>,
),
MergingValuesError,
> {
let (chrom_sizes, chrom_map) = {
let mut chrom_sizes = BTreeMap::new();
let mut chrom_map = HashMap::new();
for chrom in bigwigs
.iter()
.flat_map(BigWigRead::chroms)
.map(|c| c.name.clone())
{
if chrom_sizes.get(&chrom).is_some() {
continue;
}
let mut size = None;
let mut bws = Vec::with_capacity(bigwigs.len());
for w in bigwigs.iter() {
let chroms = w.chroms();
let res = chroms.iter().find(|v| v.name == chrom);
let res = match res {
Some(res) => res,
None => continue,
};
match size {
Some(all_size) => {
if all_size != res.length {
eprintln!("Chrom '{:?}' had different sizes in the bigwig files. (Are you using the same assembly?)", chrom);
return Err(MergingValuesError::MismatchedChroms(
"Invalid input (nonmatching chroms)".to_owned(),
));
}
}
None => {
size = Some(res.length);
}
}
bws.push((w.info().clone(), w.inner_read().path.clone()));
}
let size = size.unwrap();
chrom_sizes.insert(chrom.clone(), (size, bws));
chrom_map.insert(chrom.clone(), size);
}
(chrom_sizes, chrom_map)
};
const MAX_FDS: usize = 1000;
const PARALLEL_CHROMS: usize = 1;
let max_bw_fds: usize = MAX_FDS
- 1
- 1
- (1 + 1 + max_zooms + max_zooms ) * PARALLEL_CHROMS;
let iter = chrom_sizes.into_iter().map(move |(chrom, (size, bws))| {
if bws.len() > max_bw_fds {
eprintln!("Number of bigWigs to merge would exceed the maximum number of file descriptors. Splitting into chunks.");
let mut merges: Vec<Box<dyn Iterator<Item = Result<Value, MergingValuesError>> + Send>> = bws
.into_iter()
.map(|b| {
let f = ReopenableFile { file: File::open(&b.1)?, path: b.1 };
let b = BigWigRead::with_info(b.0, f);
let iter = b.get_interval_move(&chrom, 1, size).map(|i| i.map(|r| r.map_err(|e| MergingValuesError::BBIReadError(e))))?;
Ok(Box::new(iter) as Box<_>)
})
.collect::<Result<Vec<_>, BBIReadError>>()?;
while merges.len() > max_bw_fds {
merges = {
let len = merges.len();
let mut vals = merges.into_iter().peekable();
let mut merges: Vec<Box<dyn Iterator<Item = Result<Value, MergingValuesError>> + Send>> = Vec::with_capacity(len/max_bw_fds+1);
while vals.peek().is_some() {
let chunk = vals.by_ref().take(max_bw_fds).collect::<Vec<_>>();
let mut mergingvalues = MergingValues::new(chunk, threshold, adjust, clip);
let (sender, receiver) = unbounded::<Value>();
loop {
let val = match mergingvalues.iter.next() {
Some(Ok(v)) => v,
Some(Err(e)) => Err(e)?,
None => break,
};
sender.send(val).unwrap();
}
merges.push(Box::new(receiver.into_iter().map(Result::Ok)));
}
merges
};
}
let mergingvalues = MergingValues::new(merges, threshold, adjust, clip);
Ok((chrom, size, mergingvalues))
} else {
let iters: Vec<_> = bws
.into_iter()
.map(|b| {
let f = ReopenableFile { file: File::open(&b.1)?, path: b.1 };
let b = BigWigRead::with_info(b.0, f);
b.get_interval_move(&chrom, 1, size).map(|i| i.map(|r| r.map_err(|e| MergingValuesError::BBIReadError(e))))
})
.collect::<Result<Vec<_>, _>>()?;
let mergingvalues = MergingValues::new(iters, threshold, adjust, clip);
Ok((chrom, size, mergingvalues))
}
});
Ok((iter, chrom_map))
}
pub struct ChromGroupReadImpl {
pub iter:
Box<dyn Iterator<Item = Result<(String, u32, MergingValues), MergingValuesError>> + Send>,
}
impl BBIDataSource for ChromGroupReadImpl {
type Value = Value;
type Error = MergingValuesError;
fn process_to_bbi<
P: BBIDataProcessor<Value = Self::Value>,
StartProcessing: FnMut(String) -> Result<P, ProcessDataError>,
Advance: FnMut(P),
>(
&mut self,
runtime: &Runtime,
start_processing: &mut StartProcessing,
advance: &mut Advance,
) -> Result<(), BBIProcessError<Self::Error>> {
loop {
let next: Option<Result<(String, u32, MergingValues), MergingValuesError>> =
self.iter.next();
match next {
Some(Ok((chrom, _, mut group))) => {
let mut p = start_processing(chrom)?;
loop {
let current_val = match group.iter.next() {
Some(Ok(v)) => v,
Some(Err(e)) => Err(BBIProcessError::SourceError(e))?,
None => break,
};
let next_val = match group.iter.peek() {
Some(Ok(v)) => Some(v),
Some(Err(_)) | None => None,
};
let read = p.do_process(current_val, next_val);
runtime.block_on(read)?;
}
advance(p);
}
Some(Err(e)) => return Err(BBIProcessError::SourceError(e)),
None => break,
}
}
Ok(())
}
}