use std::io::{BufRead, BufReader};
use std::{collections::HashMap, error::Error, fs::File, path::PathBuf};
use clap::Parser;
use tokio::runtime;
use crate::bed::bedparser::parse_bedgraph;
use crate::bed::indexer::index_chroms;
use crate::beddata::{BedParserParallelStreamingIterator, BedParserStreamingIterator};
use crate::{BigWigWrite, InputSortType};
use super::BBIWriteArgs;
#[derive(Clone, Debug, PartialEq, Parser)]
#[command(
name = "bedgraphtobigwig",
about = "Converts an input bedGraph to a bigWig.",
long_about = "Converts an input bedGraph to a bigWig. Can be multi-threaded for substantial speedups. Note that ~11 temporary files are created/maintained."
)]
pub struct BedGraphToBigWigArgs {
pub bedgraph: String,
pub chromsizes: String,
pub output: String,
#[arg(short = 'p', long)]
#[arg(default_value = "auto")]
pub parallel: String,
#[arg(long)]
#[arg(default_value_t = false)]
pub single_pass: bool,
#[command(flatten)]
pub write_args: BBIWriteArgs,
}
pub fn bedgraphtobigwig(args: BedGraphToBigWigArgs) -> Result<(), Box<dyn Error>> {
let bedgraphpath = args.bedgraph;
let chrom_map = args.chromsizes;
let bigwigpath = args.output;
let nthreads = args.write_args.nthreads;
let input_sort_type = match args.write_args.sorted.as_ref() {
"all" => InputSortType::ALL,
"start" => InputSortType::START,
"none" => {
eprintln!("Using completely unsorted input is not implemented yet.");
return Ok(());
}
sorted => {
eprintln!(
"Invalid option for `sorted`: `{}`. Options are `all`, `start`, or `none`.",
sorted
);
return Ok(());
}
};
let chrom_map: HashMap<String, u32> = BufReader::new(File::open(chrom_map)?)
.lines()
.filter(|l| match l {
Ok(s) => !s.is_empty(),
_ => true,
})
.map(|l| {
let words = l.expect("Split error");
let mut split = words.split_whitespace();
(
split.next().expect("Missing chrom").to_owned(),
split.next().expect("Missing size").parse::<u32>().unwrap(),
)
})
.collect();
let mut outb = BigWigWrite::create_file(bigwigpath, chrom_map)?;
outb.options.max_zooms = args.write_args.nzooms;
outb.options.manual_zoom_sizes = args.write_args.zooms;
outb.options.compress = !args.write_args.uncompressed;
outb.options.input_sort_type = input_sort_type;
outb.options.block_size = args.write_args.block_size;
outb.options.inmemory = args.write_args.inmemory;
let runtime = if nthreads == 1 {
outb.options.channel_size = 0;
runtime::Builder::new_current_thread().build().unwrap()
} else {
runtime::Builder::new_multi_thread()
.worker_threads(nthreads)
.build()
.unwrap()
};
let allow_out_of_order_chroms = !matches!(outb.options.input_sort_type, InputSortType::ALL);
if bedgraphpath == "-" || bedgraphpath == "stdin" || bedgraphpath == "/dev/stdin" {
let stdin = std::io::stdin().lock();
let vals = BedParserStreamingIterator::from_bedgraph_file(stdin, allow_out_of_order_chroms);
outb.write(vals, runtime)?;
} else {
let infile = File::open(&bedgraphpath)?;
let (parallel, parallel_required) = match (nthreads, args.parallel.as_ref()) {
(1, _) | (_, "no") => (false, false),
(_, "auto") => (infile.metadata()?.len() >= 200_000_000, false),
(_, "yes") => (true, true),
(_, v) => {
eprintln!(
"Unexpected value for `parallel`: \"{}\". Defaulting to `auto`.",
v
);
(infile.metadata()?.len() >= 200_000_000, false)
}
};
let chrom_indices = match parallel {
false => None,
true => {
let index = index_chroms(infile)?;
match (index, parallel_required) {
(Some(index), _) => Some(index),
(None, true) => {
eprintln!(
"Parallel conversion requires a sorted bedGraph file. Cancelling.",
);
return Ok(());
}
(None, false) => None,
}
}
};
if let Some(chrom_indices) = chrom_indices {
if args.single_pass {
let data = BedParserParallelStreamingIterator::new(
chrom_indices,
allow_out_of_order_chroms,
PathBuf::from(bedgraphpath),
parse_bedgraph,
);
outb.write(data, runtime)?;
} else {
outb.write_multipass(
|| {
let data = BedParserParallelStreamingIterator::new(
chrom_indices.clone(),
allow_out_of_order_chroms,
PathBuf::from(bedgraphpath.clone()),
parse_bedgraph,
);
Ok(data)
},
runtime,
)?;
}
} else {
let infile = File::open(&bedgraphpath)?;
if args.single_pass {
let vals = BedParserStreamingIterator::from_bedgraph_file(
infile,
allow_out_of_order_chroms,
);
outb.write(vals, runtime)?;
} else {
outb.write_multipass(
|| {
let infile = File::open(&bedgraphpath)?;
Ok(BedParserStreamingIterator::from_bedgraph_file(
infile,
allow_out_of_order_chroms,
))
},
runtime,
)?;
}
}
};
Ok(())
}