1use anyhow::{Error, Result};
3use grep_cli::stdout;
4use gzp::{deflate::Bgzf, BgzfSyncReader, Compression, ZBuilder};
5use lazy_static::lazy_static;
6use log::{error, warn};
7use std::{
8 ffi::OsStr,
9 fs::File,
10 io::{self, BufReader, BufWriter, Read, Write},
11 path::Path,
12};
13use termcolor::ColorChoice;
14
15pub fn set_rayon_global_pools_size(size: usize) -> Result<()> {
21 let cpus = determine_allowed_cpus(size)?;
22 rayon::ThreadPoolBuilder::new()
23 .num_threads(cpus)
24 .build_global()?;
25 Ok(())
26}
27
28#[inline]
31pub fn is_broken_pipe(err: &Error) -> bool {
32 if let Some(io_err) = err.root_cause().downcast_ref::<io::Error>() {
33 if io_err.kind() == io::ErrorKind::BrokenPipe {
34 return true;
35 }
36 }
37 false
38}
39
40pub fn determine_allowed_cpus(desired: usize) -> Result<usize> {
48 if desired == 0 {
49 error!("Must select > 0 threads");
50 Err(Error::msg("Too few threads selected. Min 4"))
51 } else if desired > num_cpus::get() {
52 let cpus = num_cpus::get();
53 warn!("Specified more threads than are available, using {}", cpus);
54 Ok(cpus)
55 } else {
56 Ok(desired)
57 }
58}
59
60pub fn is_bgzipped<P: AsRef<Path>>(path: P) -> bool {
62 let ext = path.as_ref().extension().unwrap_or_else(|| OsStr::new(""));
63 ext == "gz" || ext == "gzip" || ext == "bgzf"
64}
65
66pub fn get_reader<P: AsRef<Path>>(
76 path: &Option<P>,
77 has_headers: bool,
78 bgzipped: bool,
79) -> Result<csv::Reader<Box<dyn Read>>> {
80 let raw_reader: Box<dyn Read> = match &path {
81 Some(path) if path.as_ref().to_str().unwrap() != "-" => {
82 let reader = BufReader::new(File::open(path)?);
83 if bgzipped {
84 Box::new(BgzfSyncReader::new(reader))
85 } else {
86 Box::new(reader)
87 }
88 }
89 _ => {
90 let reader = std::io::stdin();
91 if bgzipped {
92 Box::new(BgzfSyncReader::new(reader))
93 } else {
94 Box::new(reader)
95 }
96 }
97 };
98
99 Ok(csv::ReaderBuilder::new()
100 .delimiter(b'\t')
101 .has_headers(has_headers)
102 .from_reader(raw_reader))
103}
104
105pub fn get_writer<P: AsRef<Path>>(
115 path: &Option<P>,
116 bgzipped: bool,
117 write_headers: bool,
118 threads: usize,
119 compression_level: u32,
120) -> Result<csv::Writer<Box<dyn Write>>> {
121 let raw_writer: Box<dyn Write> = match &path {
122 Some(path) if path.as_ref().to_str().unwrap() != "-" => {
123 let writer = BufWriter::new(File::create(path)?);
124 if bgzipped {
125 Box::new(
126 ZBuilder::<Bgzf, _>::new()
127 .num_threads(threads)
128 .compression_level(Compression::new(compression_level))
129 .from_writer(writer),
130 )
131 } else {
132 Box::new(writer)
133 }
134 }
135 _ => {
136 let writer = stdout(ColorChoice::Never);
137 if bgzipped {
138 Box::new(
139 ZBuilder::<Bgzf, _>::new()
140 .num_threads(threads)
141 .compression_level(Compression::new(compression_level))
142 .from_writer(writer),
143 )
144 } else {
145 Box::new(writer)
146 }
147 }
148 };
149 Ok(csv::WriterBuilder::new()
150 .delimiter(b'\t')
151 .has_headers(write_headers)
152 .from_writer(raw_writer))
153}
154
155lazy_static! {
156 pub static ref NUM_CPU: String = num_cpus::get().to_string();
158}