bio_io/
lib.rs

1use anyhow::Result;
2use colored::Colorize;
3use gzp::deflate::Bgzf; //, Gzip, Mgzip, RawDeflate};
4use gzp::{Compression, ZBuilder};
5use indicatif::{ProgressBar, ProgressStyle};
6use itertools::Itertools;
7use lazy_static::lazy_static;
8use niffler::get_reader;
9use rayon::current_num_threads;
10use regex::Regex;
11use rust_htslib::bam;
12use rust_htslib::bam::record::Aux;
13use rust_htslib::bam::Read;
14use std::collections::HashMap;
15use std::env;
16use std::ffi::OsStr;
17use std::fs::File;
18use std::io::{self, stdout, BufReader, BufWriter, Write};
19use std::path::{Path, PathBuf};
20use std::process::exit;
21use std::time::Instant;
22
23const BUFFER_SIZE: usize = 32 * 1024;
24const COMPRESSION_THREADS: usize = 8;
25const COMPRESSION_LEVEL: u32 = 6;
26
27/*
28PROGRESS BARS
29*/
30pub fn no_length_progress_bar() -> ProgressBar {
31    let progress_style_no_length = format!(
32        "{}    {} {} {} {} {}",
33        "[{elapsed_precise:.yellow.bold}]",
34        "Records".cyan(),
35        "{per_sec:<15.cyan}",
36        "Read".blue(),
37        "{human_pos:.blue}",
38        "records".blue(),
39    );
40    let style = ProgressStyle::default_bar()
41        .template(&progress_style_no_length)
42        .unwrap();
43    let bar = ProgressBar::new(0);
44    bar.set_style(style);
45    let finish = indicatif::ProgressFinish::AndLeave;
46    let bar = bar.with_finish(finish);
47    bar
48}
49
50/*
51STANDARD FILE IO
52*/
53
54/// Get a buffered output writer from stdout or a file
55fn get_output(path: Option<PathBuf>) -> Result<Box<dyn Write + Send + 'static>> {
56    let writer: Box<dyn Write + Send + 'static> = match path {
57        Some(path) => {
58            if path.as_os_str() == "-" {
59                Box::new(BufWriter::with_capacity(BUFFER_SIZE, stdout()))
60            } else {
61                Box::new(BufWriter::with_capacity(BUFFER_SIZE, File::create(path)?))
62            }
63        }
64        None => Box::new(BufWriter::with_capacity(BUFFER_SIZE, stdout())),
65    };
66    Ok(writer)
67}
68
69/// Write normal or compressed files seamlessly
70/// Uses the presence of a `.gz` extension to decide
71// Attempting to have a file writer too
72pub fn writer(filename: &str) -> Result<Box<dyn Write>> {
73    let ext = Path::new(filename).extension();
74    let path = PathBuf::from(filename);
75    let buffer = get_output(Some(path))?;
76    if ext == Some(OsStr::new("gz")) || ext == Some(OsStr::new("bgz")) {
77        let writer = ZBuilder::<Bgzf, _>::new()
78            .num_threads(COMPRESSION_THREADS)
79            .compression_level(Compression::new(COMPRESSION_LEVEL))
80            .from_writer(buffer);
81        Ok(Box::new(writer))
82    } else {
83        Ok(buffer)
84    }
85}
86
87/// write to a file, but don't error on broken pipes
88pub fn write_to_file(out: &str, buffer: &mut Box<dyn Write>) {
89    let out = write!(buffer, "{}", out);
90    if let Err(err) = out {
91        if err.kind() == io::ErrorKind::BrokenPipe {
92            exit(0);
93        } else {
94            panic!("Error: {}", err);
95        }
96    }
97}
98
99/// a reader that can read compressed files but also stdin (indicated by -)
100/// ```
101/// use bio_io::buffer_from;
102/// use std::io;
103/// let reader = buffer_from("../tests/data/test.txt.gz").expect("Error: cannot open file");
104/// let msg = io::read_to_string(reader).unwrap();
105/// assert_eq!(msg, "Hello World!\n");
106/// let reader = buffer_from("../tests/data/test.txt").expect("Error: cannot open file");
107/// let msg = io::read_to_string(reader).unwrap();
108/// assert_eq!(msg, "Hello World!\n");
109/// ```
110pub fn buffer_from<P: AsRef<Path>>(
111    path: P,
112) -> Result<BufReader<Box<dyn std::io::Read>>, anyhow::Error> {
113    let path = path.as_ref();
114    let readable: Box<dyn std::io::Read> = if path == Path::new("-") {
115        Box::new(io::BufReader::new(io::stdin()))
116    } else {
117        Box::new(io::BufReader::new(std::fs::File::open(path)?))
118    };
119    let (reader, compression) = get_reader(readable)?;
120    log::debug!("Compression: {:?}", compression);
121    let buffer = BufReader::new(reader);
122    Ok(buffer)
123}
124
125/*
126BAM IO
127*/
128
129/// Write to a bam file.
130pub fn program_bam_writer(
131    out: &str,
132    template_bam: &bam::Reader,
133    threads: usize,
134    program_name: &str,
135    program_id: &str,
136    program_version: &str,
137) -> bam::Writer {
138    let mut header = bam::Header::from_template(template_bam.header());
139
140    // add to the header
141    let header_string = String::from_utf8_lossy(&header.to_bytes()).to_string();
142    let mut header_rec = bam::header::HeaderRecord::new(b"PG");
143    // ID
144    let tag = format!("PN:{program_name}");
145    let program_count = header_string.matches(&tag).count();
146    let id_tag = format!("{}.{}", program_id, program_count + 1);
147    header_rec.push_tag(b"ID", &id_tag);
148    // PN
149    header_rec.push_tag(b"PN", program_name);
150    // PP
151    let re_pp = Regex::new(r"@PG\tID:([^\t]+)").unwrap();
152    let last_program = re_pp.captures_iter(&header_string).last();
153    if let Some(last_program) = last_program {
154        let last_program = last_program[1].to_string();
155        log::trace!("last program {}", last_program);
156        header_rec.push_tag(b"PP", &last_program);
157    };
158    // VN
159    header_rec.push_tag(b"VN", program_version);
160    let cli = env::args().join(" ");
161    // CL
162    header_rec.push_tag(b"CL", &cli);
163    header.push_record(&header_rec);
164    log::trace!("{:?}", String::from_utf8_lossy(&header.to_bytes()));
165
166    // make the writer
167    let mut out = if out == "-" {
168        bam::Writer::from_stdout(&header, bam::Format::Bam).unwrap()
169    } else {
170        bam::Writer::from_path(out, &header, bam::Format::Bam).unwrap()
171    };
172    out.set_threads(threads).unwrap();
173    out
174}
175
176/// Open bam file
177pub fn bam_reader(bam: &str, threads: usize) -> bam::Reader {
178    let mut bam = if bam == "-" {
179        bam::Reader::from_stdin().unwrap_or_else(|_| panic!("Failed to open bam from stdin"))
180    } else {
181        bam::Reader::from_path(bam).unwrap_or_else(|_| panic!("Failed to open {}", bam))
182    };
183    bam.set_threads(threads).unwrap();
184    bam
185}
186// This is a bam chunk reader
187pub struct BamChunk<'a> {
188    pub bam: bam::Records<'a, bam::Reader>,
189    pub chunk_size: usize,
190    pub pre_chunk_done: u64,
191    pub bar: ProgressBar,
192}
193
194impl<'a> BamChunk<'a> {
195    pub fn new(bam: bam::Records<'a, bam::Reader>, chunk_size: Option<usize>) -> Self {
196        let chunk_size = chunk_size.unwrap_or_else(|| current_num_threads() * 100);
197        let bar = no_length_progress_bar();
198        Self {
199            bam,
200            chunk_size,
201            pre_chunk_done: 0,
202            bar,
203        }
204    }
205}
206
207// The `Iterator` trait only requires a method to be defined for the `next` element.
208impl<'a> Iterator for BamChunk<'a> {
209    // We can refer to this type using Self::Item
210    type Item = Vec<bam::Record>;
211
212    // The return type is `Option<T>`:
213    //     * When the `Iterator` is finished, `None` is returned.
214    //     * Otherwise, the next value is wrapped in `Some` and returned.
215    // We use Self::Item in the return type, so we can change
216    // the type without having to update the function signatures.
217    fn next(&mut self) -> Option<Self::Item> {
218        // update progress bar with results from previous iteration
219        self.bar.inc(self.pre_chunk_done);
220
221        let start = Instant::now();
222        let mut cur_vec = vec![];
223        for r in self.bam.by_ref().take(self.chunk_size) {
224            let r = r.unwrap();
225            cur_vec.push(r);
226        }
227
228        // extend progress bar
229        self.pre_chunk_done = cur_vec.len() as u64;
230        self.bar.inc_length(self.pre_chunk_done);
231
232        // return
233        if cur_vec.is_empty() {
234            None
235        } else {
236            let duration = start.elapsed().as_secs_f64();
237            log::debug!(
238                "Read {} bam records at {}.",
239                format!("{:}", cur_vec.len()).bright_magenta().bold(),
240                format!("{:.2?} reads/s", cur_vec.len() as f64 / duration)
241                    .bright_cyan()
242                    .bold(),
243            );
244            Some(cur_vec)
245        }
246    }
247}
248
249#[derive(Clone, Debug)]
250pub enum PbChem {
251    Two,
252    TwoPointTwo,
253    ThreePointTwo,
254    Revio,
255}
256
257pub fn find_pb_polymerase(header: &bam::Header) -> PbChem {
258    lazy_static! {
259        static ref CHEMISTRY_MAP: HashMap<String, PbChem> = HashMap::from([
260            // regular 2.0
261            ("101-789-500".to_string(), PbChem::Two),
262            // this is actually 2.1 we didn't have training data for it
263            ("101-820-500".to_string(), PbChem::Two),
264            // regular 2.2
265            ("101-894-200".to_string(), PbChem::TwoPointTwo),
266            // is really 3.1 but has polymerase of 2.1, and we need to make that 2.0
267            ("102-194-200".to_string(), PbChem::Two),
268            // regular 3.2
269            ("102-194-100".to_string(), PbChem::ThreePointTwo),
270            // Revio has kinetics most similar to 2.2
271            ("102-739-100".to_string(), PbChem::Revio)
272        ]);
273    }
274    lazy_static! {
275        static ref MM_DS: regex::Regex =
276            regex::Regex::new(r".*READTYPE=([^;]+);.*BINDINGKIT=([^;]+);").unwrap();
277    }
278    let z = header.to_hashmap();
279    let rg = z.get("RG").expect("RG tag missing from bam file");
280    let mut read_type = "";
281    let mut binding_kit = "";
282    for tag in rg {
283        for (tag, val) in tag {
284            if tag == "DS" {
285                for cap in MM_DS.captures_iter(val) {
286                    read_type = cap.get(1).map_or("", |m| m.as_str());
287                    binding_kit = cap.get(2).map_or("", |m| m.as_str());
288                }
289            }
290        }
291    }
292    // force revio model
293    if env::var("FT_REVIO").is_ok() {
294        binding_kit = "102-739-100";
295    }
296    // make sure read-type is CCS
297    assert_eq!(read_type, "CCS");
298    // grab chemistry
299    let chemistry = CHEMISTRY_MAP.get(binding_kit).unwrap_or_else(|| {
300        log::warn!(
301            "Polymerase for BINDINGKIT={} not found. Defaulting to ML model made for REVIO.",
302            binding_kit
303        );
304        &PbChem::Revio
305    });
306
307    // log the chem being used
308    let chem = match chemistry {
309        PbChem::Two => "2.0",
310        PbChem::TwoPointTwo => "2.2",
311        PbChem::ThreePointTwo => "3.2",
312        PbChem::Revio => "Revio",
313    };
314    log::info!(
315        "Bam header implies PacBio chemistry {} binding kit {}.",
316        chem,
317        binding_kit
318    );
319    chemistry.clone()
320}
321
322///```
323/// use rust_htslib::{bam, bam::Read};
324/// use log;
325/// let mut bam = bam::Reader::from_path(&"../tests/data/all.bam").unwrap();
326/// for record in bam.records() {
327///     let record = record.unwrap();
328///     let n_s = bio_io::get_u32_tag(&record, b"ns");
329///     let n_l = bio_io::get_u32_tag(&record, b"nl");
330///     let a_s = bio_io::get_u32_tag(&record, b"as");
331///     let a_l = bio_io::get_u32_tag(&record, b"al");
332///     log::debug!("{:?}", a_s);
333/// }
334///```
335pub fn get_u32_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<i64> {
336    if let Ok(Aux::ArrayU32(array)) = record.aux(tag) {
337        let read_array = array.iter().map(|x| x as i64).collect::<Vec<_>>();
338        read_array
339    } else {
340        vec![]
341    }
342}
343
344pub fn get_u8_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<u8> {
345    if let Ok(Aux::ArrayU8(array)) = record.aux(tag) {
346        let read_array = array.iter().collect::<Vec<_>>();
347        read_array
348    } else {
349        vec![]
350    }
351}
352
353/// Convert the PacBio u16 tag into the u8 tag we normally expect.
354pub fn get_pb_u16_tag_as_u8(record: &bam::Record, tag: &[u8; 2]) -> Vec<u8> {
355    if let Ok(Aux::ArrayU16(array)) = record.aux(tag) {
356        let read_array = array.iter().collect::<Vec<_>>();
357        read_array
358            .iter()
359            .map(|&x| {
360                if x < 64 {
361                    x
362                } else if x < 191 {
363                    (x - 64) / 2 + 64
364                } else if x < 445 {
365                    (x - 192) / 4 + 128
366                } else if x < 953 {
367                    (x - 448) / 8 + 192
368                } else {
369                    255
370                }
371            })
372            .map(|x| x as u8)
373            .collect()
374    } else {
375        vec![]
376    }
377}
378
379pub fn get_f32_tag(record: &bam::Record, tag: &[u8; 2]) -> Vec<f32> {
380    if let Ok(Aux::ArrayFloat(array)) = record.aux(tag) {
381        let read_array = array.iter().collect::<Vec<_>>();
382        read_array
383    } else {
384        vec![]
385    }
386}