perbase_lib/
utils.rs

1//! General utility methods.
2use anyhow::{Error, Result};
3use grep_cli::stdout;
4use gzp::{deflate::Bgzf, BgzfSyncReader, Compression, ZBuilder};
5use lazy_static::lazy_static;
6use log::{error, warn};
7use std::{
8    ffi::OsStr,
9    fs::File,
10    io::{self, BufReader, BufWriter, Read, Write},
11    path::Path,
12};
13use termcolor::ColorChoice;
14
15/// Set rayon global thread pool size.
16///
17/// # Errors
18///
19/// - [`Error`] if an issue is encountered determening the allowed cpus.
20pub fn set_rayon_global_pools_size(size: usize) -> Result<()> {
21    let cpus = determine_allowed_cpus(size)?;
22    rayon::ThreadPoolBuilder::new()
23        .num_threads(cpus)
24        .build_global()?;
25    Ok(())
26}
27
28/// Check if err is a broken pipe.
29/// Check if err is a broken pipe.
30#[inline]
31pub fn is_broken_pipe(err: &Error) -> bool {
32    if let Some(io_err) = err.root_cause().downcast_ref::<io::Error>() {
33        if io_err.kind() == io::ErrorKind::BrokenPipe {
34            return true;
35        }
36    }
37    false
38}
39
40/// Check that specified `desired` is valid.
41///
42/// If more threads are specified than available, the max available are used.
43///
44/// # Errors
45///
46/// - [`Error`] if less than or equal to 0 threads are selected
47pub fn determine_allowed_cpus(desired: usize) -> Result<usize> {
48    if desired == 0 {
49        error!("Must select > 0 threads");
50        Err(Error::msg("Too few threads selected. Min 4"))
51    } else if desired > num_cpus::get() {
52        let cpus = num_cpus::get();
53        warn!("Specified more threads than are available, using {}", cpus);
54        Ok(cpus)
55    } else {
56        Ok(desired)
57    }
58}
59
60/// Detect if a path path ends with the usual bgzip extension.
61pub fn is_bgzipped<P: AsRef<Path>>(path: P) -> bool {
62    let ext = path.as_ref().extension().unwrap_or_else(|| OsStr::new(""));
63    ext == "gz" || ext == "gzip" || ext == "bgzf"
64}
65
66/// Get a CSV Reader
67///
68/// # Errors
69///
70/// - If unable to open file
71///
72/// # Panics
73///
74/// - If unable to parse input path
75pub fn get_reader<P: AsRef<Path>>(
76    path: &Option<P>,
77    has_headers: bool,
78    bgzipped: bool,
79) -> Result<csv::Reader<Box<dyn Read>>> {
80    let raw_reader: Box<dyn Read> = match &path {
81        Some(path) if path.as_ref().to_str().unwrap() != "-" => {
82            let reader = BufReader::new(File::open(path)?);
83            if bgzipped {
84                Box::new(BgzfSyncReader::new(reader))
85            } else {
86                Box::new(reader)
87            }
88        }
89        _ => {
90            let reader = std::io::stdin();
91            if bgzipped {
92                Box::new(BgzfSyncReader::new(reader))
93            } else {
94                Box::new(reader)
95            }
96        }
97    };
98
99    Ok(csv::ReaderBuilder::new()
100        .delimiter(b'\t')
101        .has_headers(has_headers)
102        .from_reader(raw_reader))
103}
104
105/// Open a CSV Writer to a file or stdout.
106///
107/// # Errors
108///
109/// - If file can't be created
110///
111/// # Panics
112///
113/// - If unable to parse input path
114pub fn get_writer<P: AsRef<Path>>(
115    path: &Option<P>,
116    bgzipped: bool,
117    write_headers: bool,
118    threads: usize,
119    compression_level: u32,
120) -> Result<csv::Writer<Box<dyn Write>>> {
121    let raw_writer: Box<dyn Write> = match &path {
122        Some(path) if path.as_ref().to_str().unwrap() != "-" => {
123            let writer = BufWriter::new(File::create(path)?);
124            if bgzipped {
125                Box::new(
126                    ZBuilder::<Bgzf, _>::new()
127                        .num_threads(threads)
128                        .compression_level(Compression::new(compression_level))
129                        .from_writer(writer),
130                )
131            } else {
132                Box::new(writer)
133            }
134        }
135        _ => {
136            let writer = stdout(ColorChoice::Never);
137            if bgzipped {
138                Box::new(
139                    ZBuilder::<Bgzf, _>::new()
140                        .num_threads(threads)
141                        .compression_level(Compression::new(compression_level))
142                        .from_writer(writer),
143                )
144            } else {
145                Box::new(writer)
146            }
147        }
148    };
149    Ok(csv::WriterBuilder::new()
150        .delimiter(b'\t')
151        .has_headers(write_headers)
152        .from_writer(raw_writer))
153}
154
155lazy_static! {
156    /// Return the number of cpus as an &str
157    pub static ref NUM_CPU: String = num_cpus::get().to_string();
158}