use crate::mock_command::{CommandChild, RunCommand};
use crate::util::fs::File;
use blake3::Hasher as blake3_Hasher;
use byteorder::{BigEndian, ByteOrder};
use serde::Serialize;
use std::ffi::{OsStr, OsString};
use std::hash::Hasher;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::process::{self, Stdio};
use std::time;
use std::time::Duration;
pub use fs_err as fs;
use crate::errors::*;
#[derive(Clone)]
pub struct Digest {
inner: blake3_Hasher,
}
impl Digest {
pub fn new() -> Digest {
Digest {
inner: blake3_Hasher::new(),
}
}
pub async fn file<T>(path: T, pool: &tokio::runtime::Handle) -> Result<String>
where
T: AsRef<Path>,
{
Self::reader(path.as_ref().to_owned(), pool).await
}
pub fn reader_sync<R: Read>(mut reader: R) -> Result<String> {
let mut m = Digest::new();
let mut buffer = [0; 128 * 1024];
loop {
let count = reader.read(&mut buffer[..])?;
if count == 0 {
break;
}
m.update(&buffer[..count]);
}
Ok(m.finish())
}
pub async fn reader(path: PathBuf, pool: &tokio::runtime::Handle) -> Result<String> {
pool.spawn_blocking(move || {
let reader = File::open(&path)
.with_context(|| format!("Failed to open file for hashing: {:?}", path))?;
Digest::reader_sync(reader)
})
.await?
}
pub fn update(&mut self, bytes: &[u8]) {
self.inner.update(bytes);
}
pub fn finish(self) -> String {
hex(self.inner.finalize().as_bytes())
}
}
impl Default for Digest {
fn default() -> Self {
Self::new()
}
}
pub fn hex(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for &byte in bytes {
s.push(hex(byte & 0xf));
s.push(hex((byte >> 4) & 0xf));
}
return s;
fn hex(byte: u8) -> char {
match byte {
0..=9 => (b'0' + byte) as char,
_ => (b'a' + byte - 10) as char,
}
}
}
pub async fn hash_all(files: &[PathBuf], pool: &tokio::runtime::Handle) -> Result<Vec<String>> {
let start = time::Instant::now();
let count = files.len();
let iter = files.iter().map(move |f| Digest::file(f, pool));
let hashes = futures::future::try_join_all(iter).await?;
trace!(
"Hashed {} files in {}",
count,
fmt_duration_as_secs(&start.elapsed())
);
Ok(hashes)
}
pub fn fmt_duration_as_secs(duration: &Duration) -> String {
format!("{}.{:03} s", duration.as_secs(), duration.subsec_millis())
}
async fn wait_with_input_output<T>(mut child: T, input: Option<Vec<u8>>) -> Result<process::Output>
where
T: CommandChild + 'static,
{
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let stdin = input.and_then(|i| {
child.take_stdin().map(|mut stdin| async move {
stdin.write_all(&i).await.context("failed to write stdin")
})
});
let stdout = child.take_stdout();
let stdout = async move {
match stdout {
Some(mut stdout) => {
let mut buf = Vec::new();
stdout
.read_to_end(&mut buf)
.await
.context("failed to read stdout")?;
Result::Ok(Some(buf))
}
None => Ok(None),
}
};
let stderr = child.take_stderr();
let stderr = async move {
match stderr {
Some(mut stderr) => {
let mut buf = Vec::new();
stderr
.read_to_end(&mut buf)
.await
.context("failed to read stderr")?;
Result::Ok(Some(buf))
}
None => Ok(None),
}
};
let status = async move {
if let Some(stdin) = stdin {
let _ = stdin.await;
}
child.wait().await.context("failed to wait for child")
};
let (status, stdout, stderr) = futures::future::try_join3(status, stdout, stderr).await?;
Ok(process::Output {
status,
stdout: stdout.unwrap_or_default(),
stderr: stderr.unwrap_or_default(),
})
}
pub async fn run_input_output<C>(mut command: C, input: Option<Vec<u8>>) -> Result<process::Output>
where
C: RunCommand,
{
let child = command
.no_console()
.stdin(if input.is_some() {
Stdio::piped()
} else {
Stdio::inherit()
})
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.await?;
wait_with_input_output(child, input)
.await
.and_then(|output| {
if output.status.success() {
Ok(output)
} else {
Err(ProcessError(output).into())
}
})
}
pub fn write_length_prefixed_bincode<W, S>(mut writer: W, data: S) -> Result<()>
where
W: Write,
S: Serialize,
{
let bytes = bincode::serialize(&data)?;
let mut len = [0; 4];
BigEndian::write_u32(&mut len, bytes.len() as u32);
writer.write_all(&len)?;
writer.write_all(&bytes)?;
writer.flush()?;
Ok(())
}
pub trait OsStrExt {
fn starts_with(&self, s: &str) -> bool;
fn split_prefix(&self, s: &str) -> Option<OsString>;
}
#[cfg(unix)]
use std::os::unix::ffi::OsStrExt as _OsStrExt;
#[cfg(unix)]
impl OsStrExt for OsStr {
fn starts_with(&self, s: &str) -> bool {
self.as_bytes().starts_with(s.as_bytes())
}
fn split_prefix(&self, s: &str) -> Option<OsString> {
let bytes = self.as_bytes();
if bytes.starts_with(s.as_bytes()) {
Some(OsStr::from_bytes(&bytes[s.len()..]).to_owned())
} else {
None
}
}
}
#[cfg(windows)]
use std::os::windows::ffi::{OsStrExt as _OsStrExt, OsStringExt};
#[cfg(windows)]
impl OsStrExt for OsStr {
fn starts_with(&self, s: &str) -> bool {
let u16s = self.encode_wide();
let mut utf8 = s.chars();
for codepoint in u16s {
let to_match = match utf8.next() {
Some(ch) => ch,
None => return true,
};
let to_match = to_match as u32;
let codepoint = codepoint as u32;
if to_match < 0xd7ff {
if to_match != codepoint {
return false;
}
} else {
return false;
}
}
utf8.next().is_none()
}
fn split_prefix(&self, s: &str) -> Option<OsString> {
let mut u16s = self.encode_wide().peekable();
let mut utf8 = s.chars();
while let Some(&codepoint) = u16s.peek() {
let to_match = match utf8.next() {
Some(ch) => ch,
None => {
let codepoints = u16s.collect::<Vec<_>>();
return Some(OsString::from_wide(&codepoints));
}
};
let to_match = to_match as u32;
let codepoint = codepoint as u32;
if to_match < 0xd7ff {
if to_match != codepoint {
return None;
}
} else {
return None;
}
u16s.next();
}
if utf8.next().is_none() {
Some(OsString::new())
} else {
None
}
}
}
pub struct HashToDigest<'a> {
pub digest: &'a mut Digest,
}
impl<'a> Hasher for HashToDigest<'a> {
fn write(&mut self, bytes: &[u8]) {
self.digest.update(bytes)
}
fn finish(&self) -> u64 {
panic!("not supposed to be called");
}
}
pub fn ref_env(env: &[(OsString, OsString)]) -> impl Iterator<Item = (&OsString, &OsString)> {
env.iter().map(|&(ref k, ref v)| (k, v))
}
#[cfg(feature = "hyperx")]
pub use self::http_extension::{HeadersExt, RequestExt};
#[cfg(feature = "hyperx")]
mod http_extension {
use reqwest::header::{HeaderMap, HeaderValue};
use std::fmt;
pub trait HeadersExt {
fn set<H>(&mut self, header: H)
where
H: hyperx::header::Header + fmt::Display;
fn get_hyperx<H>(&self) -> Option<H>
where
H: hyperx::header::Header;
}
impl HeadersExt for HeaderMap {
fn set<H>(&mut self, header: H)
where
H: hyperx::header::Header + fmt::Display,
{
self.insert(
H::header_name(),
HeaderValue::from_maybe_shared(header.to_string()).unwrap(),
);
}
fn get_hyperx<H>(&self) -> Option<H>
where
H: hyperx::header::Header,
{
http::HeaderMap::get(self, H::header_name())
.and_then(|header| H::parse_header(&header.as_bytes().into()).ok())
}
}
pub trait RequestExt {
fn set_header<H>(self, header: H) -> Self
where
H: hyperx::header::Header + fmt::Display;
}
impl RequestExt for http::request::Builder {
fn set_header<H>(self, header: H) -> Self
where
H: hyperx::header::Header + fmt::Display,
{
self.header(
H::header_name(),
HeaderValue::from_maybe_shared(header.to_string()).unwrap(),
)
}
}
impl RequestExt for http::response::Builder {
fn set_header<H>(self, header: H) -> Self
where
H: hyperx::header::Header + fmt::Display,
{
self.header(
H::header_name(),
HeaderValue::from_maybe_shared(header.to_string()).unwrap(),
)
}
}
#[cfg(feature = "reqwest")]
impl RequestExt for ::reqwest::RequestBuilder {
fn set_header<H>(self, header: H) -> Self
where
H: hyperx::header::Header + fmt::Display,
{
self.header(
H::header_name(),
HeaderValue::from_maybe_shared(header.to_string()).unwrap(),
)
}
}
}
#[cfg(not(windows))]
pub fn daemonize() -> Result<()> {
use daemonize::Daemonize;
use std::env;
use std::mem;
match env::var("CACHEPOT_NO_DAEMON") {
Ok(ref val) if val == "1" => {}
_ => {
Daemonize::new().start().context("failed to daemonize")?;
}
}
static mut PREV_SIGSEGV: *mut libc::sigaction = 0 as *mut _;
static mut PREV_SIGBUS: *mut libc::sigaction = 0 as *mut _;
static mut PREV_SIGILL: *mut libc::sigaction = 0 as *mut _;
unsafe {
match env::var("CACHEPOT_ALLOW_CORE_DUMPS") {
Ok(ref val) if val == "1" => {
let rlim = libc::rlimit {
rlim_cur: libc::RLIM_INFINITY,
rlim_max: libc::RLIM_INFINITY,
};
libc::setrlimit(libc::RLIMIT_CORE, &rlim);
}
_ => {}
}
PREV_SIGSEGV = Box::into_raw(Box::new(mem::zeroed::<libc::sigaction>()));
PREV_SIGBUS = Box::into_raw(Box::new(mem::zeroed::<libc::sigaction>()));
PREV_SIGILL = Box::into_raw(Box::new(mem::zeroed::<libc::sigaction>()));
let mut new: libc::sigaction = mem::zeroed();
new.sa_sigaction = handler as usize;
new.sa_flags = libc::SA_SIGINFO | libc::SA_RESTART;
libc::sigaction(libc::SIGSEGV, &new, &mut *PREV_SIGSEGV);
libc::sigaction(libc::SIGBUS, &new, &mut *PREV_SIGBUS);
libc::sigaction(libc::SIGILL, &new, &mut *PREV_SIGILL);
}
return Ok(());
extern "C" fn handler(
signum: libc::c_int,
_info: *mut libc::siginfo_t,
_ptr: *mut libc::c_void,
) {
use std::fmt::{Result, Write};
struct Stderr;
impl Write for Stderr {
fn write_str(&mut self, s: &str) -> Result {
unsafe {
let bytes = s.as_bytes();
libc::write(libc::STDERR_FILENO, bytes.as_ptr() as *const _, bytes.len());
Ok(())
}
}
}
unsafe {
let _ = writeln!(Stderr, "signal {} received", signum);
match signum {
libc::SIGBUS => libc::sigaction(signum, &*PREV_SIGBUS, std::ptr::null_mut()),
libc::SIGILL => libc::sigaction(signum, &*PREV_SIGILL, std::ptr::null_mut()),
_ => libc::sigaction(signum, &*PREV_SIGSEGV, std::ptr::null_mut()),
};
}
}
}
#[cfg(windows)]
pub fn daemonize() -> Result<()> {
Ok(())
}
#[cfg(any(feature = "dist-client", feature = "dist-worker"))]
pub fn native_tls_no_sni_client_builder<'a, I, T>(root_certs: I) -> Result<reqwest::ClientBuilder>
where
I: Iterator<Item = T>,
T: AsRef<[u8]>,
{
let mut tls_builder = native_tls::TlsConnector::builder();
for root_cert in root_certs {
tls_builder.add_root_certificate(native_tls::Certificate::from_pem(root_cert.as_ref())?);
}
tls_builder.use_sni(false);
let tls = tls_builder.build()?;
let client_builder = reqwest::ClientBuilder::new()
.use_native_tls()
.use_preconfigured_tls(tls);
Ok(client_builder)
}
#[cfg(test)]
mod tests {
use super::OsStrExt;
use std::ffi::{OsStr, OsString};
#[test]
fn simple_starts_with() {
let a: &OsStr = "foo".as_ref();
assert!(a.starts_with(""));
assert!(a.starts_with("f"));
assert!(a.starts_with("fo"));
assert!(a.starts_with("foo"));
assert!(!a.starts_with("foo2"));
assert!(!a.starts_with("b"));
assert!(!a.starts_with("b"));
let a: &OsStr = "".as_ref();
assert!(!a.starts_with("a"))
}
#[test]
fn simple_strip_prefix() {
let a: &OsStr = "foo".as_ref();
assert_eq!(a.split_prefix(""), Some(OsString::from("foo")));
assert_eq!(a.split_prefix("f"), Some(OsString::from("oo")));
assert_eq!(a.split_prefix("fo"), Some(OsString::from("o")));
assert_eq!(a.split_prefix("foo"), Some(OsString::from("")));
assert_eq!(a.split_prefix("foo2"), None);
assert_eq!(a.split_prefix("b"), None);
}
}