use std::error::Error;
use std::fs::File;
use std::io::{self, BufReader, Write};
use std::path::Path;
use clap::Parser;
use futures::{SinkExt, StreamExt};
use tokio::runtime;
use ufmt::uwrite;
use crate::utils::reopen::{Reopen, SeekableRead};
use crate::utils::streaming_linereader::StreamingLineReader;
use crate::utils::tempfilebuffer::{TempFileBuffer, TempFileBufferWriter};
use crate::{BBIReadError, BigBedRead, ChromInfo};
#[derive(Clone, Debug, PartialEq, Parser)]
#[command(
name = "bigbedtobed",
about = "Converts an input bigBed to a bed.",
long_about = "Converts an input bigBed to a bed. Can be multi-threaded for substantial speedups. Note for roughly each core, one temporary file will be opened."
)]
pub struct BigBedToBedArgs {
pub big_bed: String,
pub bed: String,
#[arg(long)]
pub chrom: Option<String>,
#[arg(long)]
pub start: Option<u32>,
#[arg(long)]
pub end: Option<u32>,
pub overlap_bed: Option<String>,
#[arg(short = 't', long)]
#[arg(default_value_t = 6)]
pub nthreads: usize,
#[arg(long)]
#[arg(default_value_t = false)]
pub inmemory: bool,
#[arg(long)]
pub zoom: Option<u32>,
}
pub fn bigbedtobed(args: BigBedToBedArgs) -> Result<(), Box<dyn Error>> {
let bigbedpath = args.big_bed;
let bedpath = args.bed;
let nthreads = args.nthreads;
let bigbed = BigBedRead::open_file(&bigbedpath)?;
let bed = File::create(bedpath)?;
if (args.start.is_some() || args.end.is_some()) && args.chrom.is_none() {
eprintln!("Cannot specify --start or --end without specifying --chrom.");
return Ok(());
}
if args.chrom.is_some() && args.overlap_bed.is_some() {
eprintln!("Cannot specify both --overlap-bed and interval to overlap.");
return Ok(());
}
match args.overlap_bed {
Some(overlap_bed) => {
if !Path::exists(&Path::new(&overlap_bed)) {
eprintln!("Overlap bed file does not exist.");
return Ok(());
}
let overlap_bed = File::open(overlap_bed)?;
write_bed_from_bed(bigbed, bed, overlap_bed)?;
}
None => {
if nthreads == 1 || args.chrom.is_some() || args.zoom.is_some() {
write_bed_singlethreaded(bigbed, bed, args.chrom, args.start, args.end, args.zoom)?;
} else {
write_bed(bigbed, bed, args.inmemory, nthreads)?;
}
}
}
Ok(())
}
pub fn write_bed_singlethreaded<R: Reopen + SeekableRead>(
mut bigbed: BigBedRead<R>,
out_file: File,
chrom: Option<String>,
start: Option<u32>,
end: Option<u32>,
zoom: Option<u32>,
) -> Result<(), Box<dyn Error>> {
let start = chrom.as_ref().and_then(|_| start);
let end = chrom.as_ref().and_then(|_| end);
let chroms: Vec<ChromInfo> = if let Some(arg_chrom) = chrom {
let chrom = bigbed.chroms().iter().find(|c| c.name == arg_chrom);
let Some(chrom) = chrom else {
eprintln!("Error: {arg_chrom} not found in file.");
return Ok(());
};
vec![chrom.clone()]
} else {
bigbed.chroms().to_vec()
};
let mut writer = io::BufWriter::with_capacity(32 * 1000, out_file);
let mut buf: String = String::with_capacity(50); if let Some(zoom) = zoom {
if bigbed
.info()
.zoom_headers
.iter()
.all(|z| z.reduction_level != zoom)
{
eprintln!("Error: Zoom level not found: {zoom}");
return Ok(());
}
for chrom in chroms {
let start = start.unwrap_or(0);
let end = end.unwrap_or(chrom.length);
for raw_val in bigbed.get_zoom_interval(&chrom.name, start, end, zoom)? {
let val = raw_val?;
uwrite!(
&mut buf,
"{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
chrom.name,
val.start,
val.end,
val.summary.total_items,
val.summary.bases_covered,
val.summary.min_val.to_string(),
val.summary.max_val.to_string(),
val.summary.sum.to_string(),
val.summary.sum_squares.to_string()
)?;
writer.write(buf.as_bytes())?;
buf.clear();
}
}
} else {
for chrom in chroms {
let start = start.unwrap_or(0);
let end = end.unwrap_or(chrom.length);
for raw_val in bigbed.get_interval(&chrom.name, start, end)? {
let val = raw_val?;
if !val.rest.is_empty() {
uwrite!(
&mut buf,
"{}\t{}\t{}\t{}\n",
chrom.name,
val.start,
val.end,
val.rest
)
.unwrap();
} else {
uwrite!(&mut buf, "{}\t{}\t{}\n", chrom.name, val.start, val.end).unwrap();
};
writer.write(buf.as_bytes())?;
buf.clear();
}
}
}
Ok(())
}
pub fn write_bed<R: Reopen + SeekableRead + Send + 'static>(
bigbed: BigBedRead<R>,
mut out_file: File,
inmemory: bool,
nthreads: usize,
) -> Result<(), BBIReadError> {
let runtime = runtime::Builder::new_multi_thread()
.worker_threads(nthreads)
.build()
.unwrap();
let mut remaining_chroms = bigbed.chroms().to_vec();
remaining_chroms.reverse();
async fn file_future<R: SeekableRead + 'static>(
mut bigbed: BigBedRead<R>,
chrom: ChromInfo,
mut writer: io::BufWriter<TempFileBufferWriter<File>>,
) -> Result<(), BBIReadError> {
let mut buf: String = String::with_capacity(50); for raw_val in bigbed.get_interval(&chrom.name, 0, chrom.length)? {
let val = raw_val?;
if !val.rest.is_empty() {
uwrite!(
&mut buf,
"{}\t{}\t{}\t{}\n",
chrom.name,
val.start,
val.end,
val.rest
)
.unwrap();
} else {
uwrite!(&mut buf, "{}\t{}\t{}\n", chrom.name, val.start, val.end).unwrap();
};
writer.write(buf.as_bytes())?;
buf.clear();
}
Ok(())
}
let (mut handle_snd, mut handle_rcv) = futures::channel::mpsc::channel(nthreads);
let (mut buf_snd, mut buf_rcv) = futures::channel::mpsc::unbounded();
runtime.spawn(async move {
loop {
let Some(chrom) = remaining_chroms.pop() else {
return Ok::<_, BBIReadError>(());
};
let bigbed = bigbed.reopen()?;
let (buf, file): (TempFileBuffer<File>, TempFileBufferWriter<File>) =
TempFileBuffer::new(inmemory);
let writer = io::BufWriter::new(file);
let handle = tokio::task::spawn(file_future(bigbed, chrom, writer));
handle_snd.send(handle).await.unwrap();
buf_snd.send(buf).await.unwrap();
}
});
let data_handle = runtime.spawn(async move {
loop {
let next = handle_rcv.next().await;
let Some(handle) = next else {
return Ok::<_, BBIReadError>(());
};
handle.await.unwrap()?;
}
});
runtime.block_on(async move {
loop {
let next = buf_rcv.next().await;
let Some(mut buf) = next else {
data_handle.await.unwrap()?;
return Ok::<_, BBIReadError>(());
};
buf.switch(out_file);
while !buf.is_real_file_ready() {
tokio::task::yield_now().await;
}
out_file = buf.await_real_file();
}
})?;
Ok(())
}
pub fn write_bed_from_bed<R: Reopen + SeekableRead + Send + 'static>(
mut bigbed: BigBedRead<R>,
out_file: File,
bed: File,
) -> Result<(), BBIReadError> {
let mut bedstream = StreamingLineReader::new(BufReader::new(bed));
let mut writer = io::BufWriter::new(out_file);
let mut buf = String::with_capacity(50); while let Some(line) = bedstream.read() {
let line = line?;
let mut split = line.trim().splitn(5, '\t');
let chrom = split.next().expect("Missing chrom");
let start = split.next().expect("Missing start").parse::<u32>().unwrap();
let end = split.next().expect("Missing end").parse::<u32>().unwrap();
for raw_val in bigbed.get_interval(chrom, start, end)? {
let mut val = raw_val?;
val.start = val.start.max(start);
val.end = val.end.min(end);
if !val.rest.is_empty() {
uwrite!(
&mut buf,
"{}\t{}\t{}\t{}\n",
chrom,
val.start,
val.end,
val.rest
)
.unwrap();
} else {
uwrite!(&mut buf, "{}\t{}\t{}\n", chrom, val.start, val.end).unwrap();
};
writer.write(buf.as_bytes())?;
buf.clear();
}
}
Ok(())
}