1use anyhow::Result;
2use colored::Colorize;
3use gzp::deflate::Bgzf; use gzp::{Compression, ZBuilder};
5use indicatif::{ProgressBar, ProgressStyle};
6use itertools::Itertools;
7use lazy_static::lazy_static;
8use niffler::get_reader;
9use rayon::current_num_threads;
10use regex::Regex;
11use rust_htslib::bam;
12use rust_htslib::bam::record::Aux;
13use rust_htslib::bam::Read;
14use std::collections::HashMap;
15use std::env;
16use std::ffi::OsStr;
17use std::fs::File;
18use std::io::{self, stdout, BufReader, BufWriter, Write};
19use std::path::{Path, PathBuf};
20use std::process::exit;
21use std::time::Instant;
22
23const BUFFER_SIZE: usize = 32 * 1024;
24const COMPRESSION_THREADS: usize = 8;
25const COMPRESSION_LEVEL: u32 = 6;
26
27pub fn no_length_progress_bar() -> ProgressBar {
31 let progress_style_no_length = format!(
32 "{} {} {} {} {} {}",
33 "[{elapsed_precise:.yellow.bold}]",
34 "Records".cyan(),
35 "{per_sec:<15.cyan}",
36 "Read".blue(),
37 "{human_pos:.blue}",
38 "records".blue(),
39 );
40 let style = ProgressStyle::default_bar()
41 .template(&progress_style_no_length)
42 .unwrap();
43 let bar = ProgressBar::new(0);
44 bar.set_style(style);
45 let finish = indicatif::ProgressFinish::AndLeave;
46 let bar = bar.with_finish(finish);
47 bar
48}
49
50fn get_output(path: Option<PathBuf>) -> Result<Box<dyn Write + Send + 'static>> {
56 let writer: Box<dyn Write + Send + 'static> = match path {
57 Some(path) => {
58 if path.as_os_str() == "-" {
59 Box::new(BufWriter::with_capacity(BUFFER_SIZE, stdout()))
60 } else {
61 Box::new(BufWriter::with_capacity(BUFFER_SIZE, File::create(path)?))
62 }
63 }
64 None => Box::new(BufWriter::with_capacity(BUFFER_SIZE, stdout())),
65 };
66 Ok(writer)
67}
68
69pub fn writer(filename: &str) -> Result<Box<dyn Write>> {
73 let ext = Path::new(filename).extension();
74 let path = PathBuf::from(filename);
75 let buffer = get_output(Some(path))?;
76 if ext == Some(OsStr::new("gz")) || ext == Some(OsStr::new("bgz")) {
77 let writer = ZBuilder::<Bgzf, _>::new()
78 .num_threads(COMPRESSION_THREADS)
79 .compression_level(Compression::new(COMPRESSION_LEVEL))
80 .from_writer(buffer);
81 Ok(Box::new(writer))
82 } else {
83 Ok(buffer)
84 }
85}
86
87pub fn write_to_file(out: &str, buffer: &mut Box<dyn Write>) {
89 let out = write!(buffer, "{}", out);
90 if let Err(err) = out {
91 if err.kind() == io::ErrorKind::BrokenPipe {
92 exit(0);
93 } else {
94 panic!("Error: {}", err);
95 }
96 }
97}
98
99pub fn buffer_from<P: AsRef<Path>>(
111 path: P,
112) -> Result<BufReader<Box<dyn std::io::Read>>, anyhow::Error> {
113 let path = path.as_ref();
114 let readable: Box<dyn std::io::Read> = if path == Path::new("-") {
115 Box::new(io::BufReader::new(io::stdin()))
116 } else {
117 Box::new(io::BufReader::new(std::fs::File::open(path)?))
118 };
119 let (reader, compression) = get_reader(readable)?;
120 log::debug!("Compression: {:?}", compression);
121 let buffer = BufReader::new(reader);
122 Ok(buffer)
123}
124
125pub fn program_bam_writer(
131 out: &str,
132 template_bam: &bam::Reader,
133 threads: usize,
134 program_name: &str,
135 program_id: &str,
136 program_version: &str,
137) -> bam::Writer {
138 let mut header = bam::Header::from_template(template_bam.header());
139
140 let header_string = String::from_utf8_lossy(&header.to_bytes()).to_string();
142 let mut header_rec = bam::header::HeaderRecord::new(b"PG");
143 let tag = format!("PN:{program_name}");
145 let program_count = header_string.matches(&tag).count();
146 let id_tag = format!("{}.{}", program_id, program_count + 1);
147 header_rec.push_tag(b"ID", &id_tag);
148 header_rec.push_tag(b"PN", program_name);
150 let re_pp = Regex::new(r"@PG\tID:([^\t]+)").unwrap();
152 let last_program = re_pp.captures_iter(&header_string).last();
153 if let Some(last_program) = last_program {
154 let last_program = last_program[1].to_string();
155 log::trace!("last program {}", last_program);
156 header_rec.push_tag(b"PP", &last_program);
157 };
158 header_rec.push_tag(b"VN", program_version);
160 let cli = env::args().join(" ");
161 header_rec.push_tag(b"CL", &cli);
163 header.push_record(&header_rec);
164 log::trace!("{:?}", String::from_utf8_lossy(&header.to_bytes()));
165
166 let mut out = if out == "-" {
168 bam::Writer::from_stdout(&header, bam::Format::Bam).unwrap()
169 } else {
170 bam::Writer::from_path(out, &header, bam::Format::Bam).unwrap()
171 };
172 out.set_threads(threads).unwrap();
173 out
174}
175
176pub fn bam_reader(bam: &str, threads: usize) -> bam::Reader {
178 let mut bam = if bam == "-" {
179 bam::Reader::from_stdin().unwrap_or_else(|_| panic!("Failed to open bam from stdin"))
180 } else {
181 bam::Reader::from_path(bam).unwrap_or_else(|_| panic!("Failed to open {}", bam))
182 };
183 bam.set_threads(threads).unwrap();
184 bam
185}
186pub struct BamChunk<'a> {
188 pub bam: bam::Records<'a, bam::Reader>,
189 pub chunk_size: usize,
190 pub pre_chunk_done: u64,
191 pub bar: ProgressBar,
192}
193
194impl<'a> BamChunk<'a> {
195 pub fn new(bam: bam::Records<'a, bam::Reader>, chunk_size: Option<usize>) -> Self {
196 let chunk_size = chunk_size.unwrap_or_else(|| current_num_threads() * 100);
197 let bar = no_length_progress_bar();
198 Self {
199 bam,
200 chunk_size,
201 pre_chunk_done: 0,
202 bar,
203 }
204 }
205}
206
207impl<'a> Iterator for BamChunk<'a> {
209 type Item = Vec<bam::Record>;
211
212 fn next(&mut self) -> Option<Self::Item> {
218 self.bar.inc(self.pre_chunk_done);
220
221 let start = Instant::now();
222 let mut cur_vec = vec![];
223 for r in self.bam.by_ref().take(self.chunk_size) {
224 let r = r.unwrap();
225 cur_vec.push(r);
226 }
227
228 self.pre_chunk_done = cur_vec.len() as u64;
230 self.bar.inc_length(self.pre_chunk_done);
231
232 if cur_vec.is_empty() {
234 None
235 } else {
236 let duration = start.elapsed().as_secs_f64();
237 log::debug!(
238 "Read {} bam records at {}.",
239 format!("{:}", cur_vec.len()).bright_magenta().bold(),
240 format!("{:.2?} reads/s", cur_vec.len() as f64 / duration)
241 .bright_cyan()
242 .bold(),
243 );
244 Some(cur_vec)
245 }
246 }
247}
248
249#[derive(Clone, Debug)]
250pub enum PbChem {
251 Two,
252 TwoPointTwo,
253 ThreePointTwo,
254 Revio,
255}
256
257pub fn find_pb_polymerase(header: &bam::Header) -> PbChem {
258 lazy_static! {
259 static ref CHEMISTRY_MAP: HashMap<String, PbChem> = HashMap::from([
260 ("101-789-500".to_string(), PbChem::Two),
262 ("101-820-500".to_string(), PbChem::Two),
264 ("101-894-200".to_string(), PbChem::TwoPointTwo),
266 ("102-194-200".to_string(), PbChem::Two),
268 ("102-194-100".to_string(), PbChem::ThreePointTwo),
270 ("102-739-100".to_string(), PbChem::Revio)
272 ]);
273 }
274 lazy_static! {
275 static ref MM_DS: regex::Regex =
276 regex::Regex::new(r".*READTYPE=([^;]+);.*BINDINGKIT=([^;]+);").unwrap();
277 }
278 let z = header.to_hashmap();
279 let rg = z.get("RG").expect("RG tag missing from bam file");
280 let mut read_type = "";
281 let mut binding_kit = "";
282 for tag in rg {
283 for (tag, val) in tag {
284 if tag == "DS" {
285 for cap in MM_DS.captures_iter(val) {
286 read_type = cap.get(1).map_or("", |m| m.as_str());
287 binding_kit = cap.get(2).map_or("", |m| m.as_str());
288 }
289 }
290 }
291 }
292 if env::var("FT_REVIO").is_ok() {
294 binding_kit = "102-739-100";
295 }
296 assert_eq!(read_type, "CCS");
298 let chemistry = CHEMISTRY_MAP.get(binding_kit).unwrap_or_else(|| {
300 log::warn!(
301 "Polymerase for BINDINGKIT={} not found. Defaulting to ML model made for REVIO.",
302 binding_kit
303 );
304 &PbChem::Revio
305 });
306
307 let chem = match chemistry {
309 PbChem::Two => "2.0",
310 PbChem::TwoPointTwo => "2.2",
311 PbChem::ThreePointTwo => "3.2",
312 PbChem::Revio => "Revio",
313 };
314 log::info!(
315 "Bam header implies PacBio chemistry {} binding kit {}.",
316 chem,
317 binding_kit
318 );
319 chemistry.clone()
320}
321
322pub fn get_u32_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<i64> {
336 if let Ok(Aux::ArrayU32(array)) = record.aux(tag) {
337 let read_array = array.iter().map(|x| x as i64).collect::<Vec<_>>();
338 read_array
339 } else {
340 vec![]
341 }
342}
343
344pub fn get_u8_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<u8> {
345 if let Ok(Aux::ArrayU8(array)) = record.aux(tag) {
346 let read_array = array.iter().collect::<Vec<_>>();
347 read_array
348 } else {
349 vec![]
350 }
351}
352
353pub fn get_pb_u16_tag_as_u8(record: &bam::Record, tag: &[u8; 2]) -> Vec<u8> {
355 if let Ok(Aux::ArrayU16(array)) = record.aux(tag) {
356 let read_array = array.iter().collect::<Vec<_>>();
357 read_array
358 .iter()
359 .map(|&x| {
360 if x < 64 {
361 x
362 } else if x < 191 {
363 (x - 64) / 2 + 64
364 } else if x < 445 {
365 (x - 192) / 4 + 128
366 } else if x < 953 {
367 (x - 448) / 8 + 192
368 } else {
369 255
370 }
371 })
372 .map(|x| x as u8)
373 .collect()
374 } else {
375 vec![]
376 }
377}
378
379pub fn get_f32_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<f32> {
380 if let Ok(Aux::ArrayFloat(array)) = record.aux(tag) {
381 let read_array = array.iter().collect::<Vec<_>>();
382 read_array
383 } else {
384 vec![]
385 }
386}