use anyhow::Result;
use colored::Colorize;
use gzp::deflate::Bgzf; use gzp::{Compression, ZBuilder};
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use lazy_static::lazy_static;
use niffler::get_reader;
use rayon::current_num_threads;
use regex::Regex;
use rust_htslib::bam;
use rust_htslib::bam::record::Aux;
use rust_htslib::bam::Read;
use std::collections::HashMap;
use std::env;
use std::ffi::OsStr;
use std::fs::File;
use std::io::{self, stdout, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::time::Instant;
const BUFFER_SIZE: usize = 32 * 1024;
const COMPRESSION_THREADS: usize = 8;
const COMPRESSION_LEVEL: u32 = 6;
pub fn no_length_progress_bar() -> ProgressBar {
let progress_style_no_length = format!(
"{} {} {} {} {} {}",
"[{elapsed_precise:.yellow.bold}]",
"Records".cyan(),
"{per_sec:<15.cyan}",
"Read".blue(),
"{human_pos:.blue}",
"records".blue(),
);
let style = ProgressStyle::default_bar()
.template(&progress_style_no_length)
.unwrap();
let bar = ProgressBar::new(0);
bar.set_style(style);
let finish = indicatif::ProgressFinish::AndLeave;
let bar = bar.with_finish(finish);
bar
}
fn get_output(path: Option<PathBuf>) -> Result<Box<dyn Write + Send + 'static>> {
let writer: Box<dyn Write + Send + 'static> = match path {
Some(path) => {
if path.as_os_str() == "-" {
Box::new(BufWriter::with_capacity(BUFFER_SIZE, stdout()))
} else {
Box::new(BufWriter::with_capacity(BUFFER_SIZE, File::create(path)?))
}
}
None => Box::new(BufWriter::with_capacity(BUFFER_SIZE, stdout())),
};
Ok(writer)
}
pub fn writer(filename: &str) -> Result<Box<dyn Write>> {
let ext = Path::new(filename).extension();
let path = PathBuf::from(filename);
let buffer = get_output(Some(path))?;
if ext == Some(OsStr::new("gz")) || ext == Some(OsStr::new("bgz")) {
let writer = ZBuilder::<Bgzf, _>::new()
.num_threads(COMPRESSION_THREADS)
.compression_level(Compression::new(COMPRESSION_LEVEL))
.from_writer(buffer);
Ok(Box::new(writer))
} else {
Ok(buffer)
}
}
pub fn write_to_file(out: &str, buffer: &mut Box<dyn Write>) {
let out = write!(buffer, "{}", out);
if let Err(err) = out {
if err.kind() == io::ErrorKind::BrokenPipe {
exit(0);
} else {
panic!("Error: {}", err);
}
}
}
pub fn buffer_from<P: AsRef<Path>>(
path: P,
) -> Result<BufReader<Box<dyn std::io::Read>>, anyhow::Error> {
let path = path.as_ref();
let readable: Box<dyn std::io::Read> = if path == Path::new("-") {
Box::new(io::BufReader::new(io::stdin()))
} else {
Box::new(io::BufReader::new(std::fs::File::open(path)?))
};
let (reader, compression) = get_reader(readable)?;
log::debug!("Compression: {:?}", compression);
let buffer = BufReader::new(reader);
Ok(buffer)
}
pub fn program_bam_writer(
out: &str,
template_bam: &bam::Reader,
threads: usize,
program_name: &str,
program_id: &str,
program_version: &str,
) -> bam::Writer {
let mut header = bam::Header::from_template(template_bam.header());
let header_string = String::from_utf8_lossy(&header.to_bytes()).to_string();
let mut header_rec = bam::header::HeaderRecord::new(b"PG");
let tag = format!("PN:{program_name}");
let program_count = header_string.matches(&tag).count();
let id_tag = format!("{}.{}", program_id, program_count + 1);
header_rec.push_tag(b"ID", &id_tag);
header_rec.push_tag(b"PN", program_name);
let re_pp = Regex::new(r"@PG\tID:([^\t]+)").unwrap();
let last_program = re_pp.captures_iter(&header_string).last();
if let Some(last_program) = last_program {
let last_program = last_program[1].to_string();
log::trace!("last program {}", last_program);
header_rec.push_tag(b"PP", &last_program);
};
header_rec.push_tag(b"VN", program_version);
let cli = env::args().join(" ");
header_rec.push_tag(b"CL", &cli);
header.push_record(&header_rec);
log::trace!("{:?}", String::from_utf8_lossy(&header.to_bytes()));
let mut out = if out == "-" {
bam::Writer::from_stdout(&header, bam::Format::Bam).unwrap()
} else {
bam::Writer::from_path(out, &header, bam::Format::Bam).unwrap()
};
out.set_threads(threads).unwrap();
out
}
pub fn bam_reader(bam: &str, threads: usize) -> bam::Reader {
let mut bam = if bam == "-" {
bam::Reader::from_stdin().unwrap_or_else(|_| panic!("Failed to open bam from stdin"))
} else {
bam::Reader::from_path(bam).unwrap_or_else(|_| panic!("Failed to open {}", bam))
};
bam.set_threads(threads).unwrap();
bam
}
pub struct BamChunk<'a> {
pub bam: bam::Records<'a, bam::Reader>,
pub chunk_size: usize,
pub pre_chunk_done: u64,
pub bar: ProgressBar,
}
impl<'a> BamChunk<'a> {
pub fn new(bam: bam::Records<'a, bam::Reader>, chunk_size: Option<usize>) -> Self {
let chunk_size = chunk_size.unwrap_or_else(|| current_num_threads() * 100);
let bar = no_length_progress_bar();
Self {
bam,
chunk_size,
pre_chunk_done: 0,
bar,
}
}
}
impl<'a> Iterator for BamChunk<'a> {
type Item = Vec<bam::Record>;
fn next(&mut self) -> Option<Self::Item> {
self.bar.inc(self.pre_chunk_done);
let start = Instant::now();
let mut cur_vec = vec![];
for r in self.bam.by_ref().take(self.chunk_size) {
let r = r.unwrap();
cur_vec.push(r);
}
self.pre_chunk_done = cur_vec.len() as u64;
self.bar.inc_length(self.pre_chunk_done);
if cur_vec.is_empty() {
None
} else {
let duration = start.elapsed().as_secs_f64();
log::debug!(
"Read {} bam records at {}.",
format!("{:}", cur_vec.len()).bright_magenta().bold(),
format!("{:.2?} reads/s", cur_vec.len() as f64 / duration)
.bright_cyan()
.bold(),
);
Some(cur_vec)
}
}
}
#[derive(Clone, Debug)]
pub enum PbChem {
Two,
TwoPointTwo,
ThreePointTwo,
Revio,
}
pub fn find_pb_polymerase(header: &bam::Header) -> PbChem {
lazy_static! {
static ref CHEMISTRY_MAP: HashMap<String, PbChem> = HashMap::from([
("101-789-500".to_string(), PbChem::Two),
("101-820-500".to_string(), PbChem::Two),
("101-894-200".to_string(), PbChem::TwoPointTwo),
("102-194-200".to_string(), PbChem::Two),
("102-194-100".to_string(), PbChem::ThreePointTwo),
("102-739-100".to_string(), PbChem::Revio)
]);
}
lazy_static! {
static ref MM_DS: regex::Regex =
regex::Regex::new(r".*READTYPE=([^;]+);.*BINDINGKIT=([^;]+);").unwrap();
}
let z = header.to_hashmap();
let rg = z.get("RG").expect("RG tag missing from bam file");
let mut read_type = "";
let mut binding_kit = "";
for tag in rg {
for (tag, val) in tag {
if tag == "DS" {
for cap in MM_DS.captures_iter(val) {
read_type = cap.get(1).map_or("", |m| m.as_str());
binding_kit = cap.get(2).map_or("", |m| m.as_str());
}
}
}
}
if env::var("FT_REVIO").is_ok() {
binding_kit = "102-739-100";
}
assert_eq!(read_type, "CCS");
let chemistry = CHEMISTRY_MAP.get(binding_kit).unwrap_or_else(|| {
log::warn!(
"Polymerase for BINDINGKIT={} not found. Defaulting to ML model made for REVIO.",
binding_kit
);
&PbChem::Revio
});
let chem = match chemistry {
PbChem::Two => "2.0",
PbChem::TwoPointTwo => "2.2",
PbChem::ThreePointTwo => "3.2",
PbChem::Revio => "Revio",
};
log::info!(
"Bam header implies PacBio chemistry {} binding kit {}.",
chem,
binding_kit
);
chemistry.clone()
}
pub fn get_u32_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<i64> {
if let Ok(Aux::ArrayU32(array)) = record.aux(tag) {
let read_array = array.iter().map(|x| x as i64).collect::<Vec<_>>();
read_array
} else {
vec![]
}
}
pub fn get_u8_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<u8> {
if let Ok(Aux::ArrayU8(array)) = record.aux(tag) {
let read_array = array.iter().collect::<Vec<_>>();
read_array
} else {
vec![]
}
}
pub fn get_pb_u16_tag_as_u8(record: &bam::Record, tag: &[u8; 2]) -> Vec<u8> {
if let Ok(Aux::ArrayU16(array)) = record.aux(tag) {
let read_array = array.iter().collect::<Vec<_>>();
read_array
.iter()
.map(|&x| {
if x < 64 {
x
} else if x < 191 {
(x - 64) / 2 + 64
} else if x < 445 {
(x - 192) / 4 + 128
} else if x < 953 {
(x - 448) / 8 + 192
} else {
255
}
})
.map(|x| x as u8)
.collect()
} else {
vec![]
}
}
pub fn get_f32_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<f32> {
if let Ok(Aux::ArrayFloat(array)) = record.aux(tag) {
let read_array = array.iter().collect::<Vec<_>>();
read_array
} else {
vec![]
}
}