use std::collections::VecDeque;
use std::error::Error;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
use clap::Parser;
use crossbeam_channel::TryRecvError;
use crate::bed::bedparser::{parse_bed, BedFileStream, StreamingBedValues};
use crate::utils::file_view::FileView;
use crate::utils::misc::{name_for_bed_item, stats_for_bed_item, BigWigAverageOverBedEntry, Name};
use crate::utils::reopen::{Reopen, ReopenableFile};
use crate::utils::split_file_into_chunks_by_size;
use crate::utils::streaming_linereader::StreamingLineReader;
use crate::{BBIFileRead, BBIReadError, BigWigRead};
#[derive(Clone, Debug, PartialEq, Parser)]
#[command(
name = "bigwigaverageoverbed",
about = "Gets statistics of a bigWig over a bed.",
long_about = None,
)]
pub struct BigWigAverageOverBedArgs {
pub bigwig: String,
pub bedin: String,
pub output: String,
#[arg(short = 'n', long)]
pub namecol: Option<String>,
#[arg(long)]
pub chrom: Option<String>,
#[arg(long)]
pub start: Option<u32>,
#[arg(long)]
pub end: Option<u32>,
#[arg(long)]
pub min_max: bool,
#[arg(short = 't', long)]
#[arg(default_value_t = 6)]
pub nthreads: usize,
}
pub fn bigwigaverageoverbed(
args: BigWigAverageOverBedArgs,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let bigwigpath = args.bigwig;
let bedinpath = args.bedin;
let bedoutpath = args.output;
let add_min_max = args.min_max;
let reopen = ReopenableFile {
file: File::open(&bigwigpath)?,
path: bigwigpath.into(),
};
let mut inbigwig = BigWigRead::open(reopen)?.cached();
let outbed = File::create(bedoutpath)?;
let mut bedoutwriter: BufWriter<File> = BufWriter::new(outbed);
let name = match args.namecol.as_deref() {
Some("interval") => Name::Interval,
Some("none") => Name::None,
Some(col) => {
let col = col.parse::<usize>();
match col {
Ok(col) if col > 0 => Name::Column(col - 1),
Ok(_) => return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid name column option. Column values are one-indexed, so should not be zero.",
).into()),
Err(_) => return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid name column option. Allowed options are `interval`, `none`, or an one-indexed integer value for a given column.",
).into()),
}
}
None => Name::Column(3),
};
let nthreads: usize = args.nthreads;
let parallel = nthreads > 1;
if parallel {
let chunks = split_file_into_chunks_by_size(File::open(&bedinpath)?, nthreads as u64)?;
fn process_chunk<R: Reopen + BBIFileRead>(
start: u64,
end: u64,
bedinpath: String,
name: Name,
add_min_max: bool,
inbigwig: &mut BigWigRead<R>,
) -> Result<File, Box<dyn Error + Send + Sync>> {
let mut tmp = tempfile::tempfile()?;
let chrom_bed_file = File::open(bedinpath)?;
let chrom_bed_file = FileView::new(chrom_bed_file, start, end)?;
let mut bed_stream = BedFileStream {
bed: StreamingLineReader::new(BufReader::new(chrom_bed_file)),
parse: parse_bed,
};
loop {
let (chrom, entry) = match bed_stream.next() {
None => break,
Some(Err(e)) => {
return Err(e.into());
}
Some(Ok(entry)) => entry,
};
let name = match name_for_bed_item(name, chrom, &entry) {
Ok(name) => name,
Err(e) => {
return Err(e.into());
}
};
let size = entry.end - entry.start;
let entry = match stats_for_bed_item(chrom, entry, inbigwig) {
Ok(stats) => stats,
Err(BBIReadError::InvalidChromosome(..)) => BigWigAverageOverBedEntry {
bases: 0,
max: 0.0,
min: 0.0,
mean: 0.0,
mean0: 0.0,
size,
sum: 0.0,
},
Err(e) => {
return Err(e.into());
}
};
let stats = match add_min_max {
true => format!(
"{}\t{}\t{:.3}\t{:.3}\t{:.3}\t{:.3}\t{:.3}",
entry.size,
entry.bases,
entry.sum,
entry.mean0,
entry.mean,
entry.min,
entry.max
),
false => format!(
"{}\t{}\t{:.3}\t{:.3}\t{:.3}",
entry.size, entry.bases, entry.sum, entry.mean0, entry.mean
),
};
writeln!(&mut tmp, "{}\t{}", name, stats)?
}
Ok(tmp)
}
let mut chunk_data = VecDeque::with_capacity(chunks.len());
let (chunk_data_sender, chunk_data_receiver) = crossbeam_channel::unbounded();
for (start, end) in chunks {
let (result_sender, result_receiver) = crossbeam_channel::bounded(1);
chunk_data.push_back(result_receiver);
chunk_data_sender
.send((start, end, bedinpath.to_string(), result_sender))
.unwrap();
}
drop(chunk_data_sender);
let mut threads = Vec::with_capacity(nthreads);
for _ in 0..(nthreads) {
let inbigwig_ = inbigwig.reopen()?;
let chunk_data_receiver_ = chunk_data_receiver.clone();
let do_process_chrom = move || {
let mut inbigwig = inbigwig_;
let chunk_data_receiver = chunk_data_receiver_;
loop {
let next_chunk = chunk_data_receiver.recv();
let (start, end, bedinpath, result_sender) = match next_chunk {
Ok(n) => n,
Err(_) => break,
};
let result =
process_chunk(start, end, bedinpath, name, add_min_max, &mut inbigwig);
result_sender.send(result).unwrap();
}
};
let join_handle = std::thread::spawn(do_process_chrom);
threads.push(join_handle);
}
while let Some(result_receiver) = chunk_data.pop_front() {
let mut wait = false;
loop {
if !wait {
let result = result_receiver.try_recv();
match result {
Ok(result) => {
let mut tmp = result?;
tmp.seek(SeekFrom::Start(0))?;
io::copy(&mut tmp, &mut bedoutwriter)?;
break;
}
Err(e @ TryRecvError::Disconnected) => return Err(e.into()),
Err(TryRecvError::Empty) => {
let next_chunk = chunk_data_receiver.recv();
let (start, chrom, bedinpath, result_sender) = match next_chunk {
Ok(n) => n,
Err(_) => {
wait = true;
continue;
}
};
let result = process_chunk(
start,
chrom,
bedinpath,
name,
add_min_max,
&mut inbigwig,
);
result_sender.send(result).unwrap();
}
}
} else {
let result = result_receiver.recv();
match result {
Ok(result) => {
let mut tmp = result?;
tmp.seek(SeekFrom::Start(0))?;
io::copy(&mut tmp, &mut bedoutwriter)?;
break;
}
Err(e) => return Err(e.into()),
}
}
}
}
} else {
let bed = File::open(&bedinpath)?;
let bedin: BufReader<File> = BufReader::new(bed);
let mut bedstream = StreamingLineReader::new(bedin);
loop {
let line = match bedstream.read() {
None => break,
Some(Err(e)) => {
return Err(e.into());
}
Some(Ok(line)) => line,
};
let (chrom, entry) = parse_bed(line).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"Invalid bed: A minimum of 3 columns must be specified (chrom, start, end).",
)
})??;
let name = match name_for_bed_item(name, chrom, &entry) {
Ok(name) => name,
Err(e) => {
return Err(e.into());
}
};
let entry = match stats_for_bed_item(chrom, entry, &mut inbigwig) {
Ok(stats) => stats,
Err(e) => return Err(e.into()),
};
let stats = match add_min_max {
true => format!(
"{}\t{}\t{:.3}\t{:.3}\t{:.3}\t{:.3}\t{:.3}",
entry.size,
entry.bases,
entry.sum,
entry.mean0,
entry.mean,
entry.min,
entry.max
),
false => format!(
"{}\t{}\t{:.3}\t{:.3}\t{:.3}",
entry.size, entry.bases, entry.sum, entry.mean0, entry.mean
),
};
writeln!(&mut bedoutwriter, "{}\t{}", name, stats)?
}
}
Ok(())
}