use std::collections::HashSet;
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Display, Formatter};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};
use std::thread;
use std::time::Duration;
use crate::redactions::Redactor;
use color_eyre::Result;
use duct::{Expression, IntoExecutablePath};
use eyre::{Context, bail};
#[cfg(not(any(test, target_os = "windows")))]
use signal_hook::consts::{SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2};
#[cfg(not(any(test, target_os = "windows")))]
use signal_hook::iterator::Signals;
use std::sync::LazyLock as Lazy;
use crate::config::Settings;
use crate::env;
use crate::env::PATH_KEY;
use crate::errors::Error::ScriptFailed;
use crate::file::display_path;
use crate::path_env::PathEnv;
use crate::ui::progress_report::SingleReport;
#[macro_export]
macro_rules! cmd {
( $program:expr $(, $arg:expr )* $(,)? ) => {
{
use std::ffi::OsString;
let args: std::vec::Vec<OsString> = std::vec![$( Into::<OsString>::into($arg) ),*];
$crate::cmd::cmd($program, args)
}
};
}
pub fn cmd<T, U>(program: T, args: U) -> Expression
where
T: IntoExecutablePath,
U: IntoIterator,
U::Item: Into<OsString>,
{
let program = program.to_executable();
let args: Vec<OsString> = args.into_iter().map(Into::<OsString>::into).collect();
let display_name = program.to_string_lossy();
let display_args = args
.iter()
.map(|s| s.to_string_lossy())
.collect::<Vec<_>>()
.join(" ");
let display_command = [display_name.into(), display_args].join(" ");
debug!("$ {display_command}");
duct::cmd(program, args)
}
pub struct CmdLineRunner<'a> {
cmd: Command,
pr: Option<&'a dyn SingleReport>,
pr_arc: Option<Arc<Box<dyn SingleReport>>>,
stdin: Option<String>,
redactor: Redactor,
raw: bool,
pass_signals: bool,
on_stdout: Option<Box<dyn Fn(String) + Send + 'a>>,
on_stderr: Option<Box<dyn Fn(String) + Send + 'a>>,
timeout: Option<Duration>,
sandbox: Option<crate::sandbox::SandboxConfig>,
}
const GUARD_RUNNING: u8 = 0;
const GUARD_CANCELLED: u8 = 1;
const GUARD_TIMED_OUT: u8 = 2;
fn wait_for_cancel_or_deadline<'a>(
cvar: &'a Condvar,
mut guard: MutexGuard<'a, bool>,
deadline: std::time::Instant,
) -> (MutexGuard<'a, bool>, bool) {
loop {
if *guard {
return (guard, true);
}
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
return (guard, false);
}
let (g, result) = cvar.wait_timeout(guard, remaining).unwrap();
guard = g;
if result.timed_out() {
return (guard, false);
}
}
}
struct TimeoutGuard {
state: Arc<AtomicU8>,
cancel: Arc<(Mutex<bool>, Condvar)>,
timeout: Duration,
}
impl TimeoutGuard {
fn new(timeout: Duration, pid: u32) -> Self {
let state = Arc::new(AtomicU8::new(GUARD_RUNNING));
let cancel = Arc::new((Mutex::new(false), Condvar::new()));
let state_clone = state.clone();
let cancel_clone = cancel.clone();
thread::spawn(move || {
let (lock, cvar) = &*cancel_clone;
let guard = lock.lock().unwrap();
let deadline = std::time::Instant::now() + timeout;
let (guard, cancelled) = wait_for_cancel_or_deadline(cvar, guard, deadline);
if cancelled {
return;
}
if state_clone
.compare_exchange(
GUARD_RUNNING,
GUARD_TIMED_OUT,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
return;
}
#[cfg(unix)]
{
let pid = nix::unistd::Pid::from_raw(pid as i32);
let _ = nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGTERM);
drop(guard);
let guard = lock.lock().unwrap();
let grace_deadline = std::time::Instant::now() + Duration::from_secs(5);
let (_guard, cancelled) = wait_for_cancel_or_deadline(cvar, guard, grace_deadline);
if !cancelled {
let _ = nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGKILL);
}
}
#[cfg(windows)]
{
drop(guard);
let _ = Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
});
Self {
state,
cancel,
timeout,
}
}
fn cancel(&self) {
self.state
.compare_exchange(
GUARD_RUNNING,
GUARD_CANCELLED,
Ordering::AcqRel,
Ordering::Acquire,
)
.ok();
let (lock, cvar) = &*self.cancel;
*lock.lock().unwrap() = true;
cvar.notify_one();
}
fn timed_out(&self) -> Option<Duration> {
(self.state.load(Ordering::Acquire) == GUARD_TIMED_OUT).then_some(self.timeout)
}
}
impl Drop for TimeoutGuard {
fn drop(&mut self) {
self.cancel();
}
}
static OUTPUT_LOCK: Mutex<()> = Mutex::new(());
static RUNNING_PIDS: Lazy<Mutex<HashSet<u32>>> = Lazy::new(Default::default);
impl<'a> CmdLineRunner<'a> {
pub fn new<P: AsRef<OsStr>>(program: P) -> Self {
let mut cmd = Command::new(program);
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
Self {
cmd,
pr: None,
pr_arc: None,
stdin: None,
redactor: Default::default(),
raw: false,
pass_signals: false,
on_stdout: None,
on_stderr: None,
timeout: None,
sandbox: None,
}
}
pub fn with_sandbox(mut self, sandbox: crate::sandbox::SandboxConfig) -> Self {
if sandbox.is_active() {
self.sandbox = Some(sandbox);
}
self
}
#[cfg(unix)]
pub fn kill_all(signal: nix::sys::signal::Signal) {
let pids = RUNNING_PIDS.lock().unwrap();
for pid in pids.iter() {
let pid = *pid as i32;
trace!("{signal}: {pid}");
if let Err(e) = nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), signal) {
debug!("Failed to kill cmd {pid}: {e}");
}
}
}
#[cfg(windows)]
pub fn kill_all() {
let pids = RUNNING_PIDS.lock().unwrap();
for pid in pids.iter() {
if let Err(e) = Command::new("taskkill")
.arg("/F")
.arg("/T")
.arg("/PID")
.arg(pid.to_string())
.spawn()
{
warn!("Failed to kill cmd {pid}: {e}");
}
}
}
pub fn stdin<T: Into<Stdio>>(mut self, cfg: T) -> Self {
self.cmd.stdin(cfg);
self
}
pub fn stdout<T: Into<Stdio>>(mut self, cfg: T) -> Self {
self.cmd.stdout(cfg);
self
}
pub fn stderr<T: Into<Stdio>>(mut self, cfg: T) -> Self {
self.cmd.stderr(cfg);
self
}
pub fn redact(mut self, redactions: impl IntoIterator<Item = String>) -> Self {
self.redactor = self.redactor.with_additional(redactions);
self
}
pub fn with_on_stdout<F: Fn(String) + Send + 'a>(mut self, on_stdout: F) -> Self {
self.on_stdout = Some(Box::new(on_stdout));
self
}
pub fn with_on_stderr<F: Fn(String) + Send + 'a>(mut self, on_stderr: F) -> Self {
self.on_stderr = Some(Box::new(on_stderr));
self
}
pub fn current_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
self.cmd.current_dir(dir);
self
}
pub fn env_clear(mut self) -> Self {
self.cmd.env_clear();
self
}
pub fn env<K, V>(mut self, key: K, val: V) -> Self
where
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.cmd.env(key, val);
self
}
pub fn envs<I, K, V>(mut self, vars: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.cmd.envs(vars);
self
}
pub fn prepend_path(mut self, paths: Vec<PathBuf>) -> eyre::Result<Self> {
let existing = self
.get_env(&PATH_KEY)
.map(|c| c.to_owned())
.unwrap_or_else(|| env::var_os(&*PATH_KEY).unwrap());
let mut path_env = PathEnv::from_iter(env::split_paths(&existing));
for p in paths {
path_env.add(p);
}
self.cmd.env(&*PATH_KEY, path_env.join());
Ok(self)
}
fn get_env(&self, key: &str) -> Option<&OsStr> {
for (k, v) in self.cmd.get_envs() {
if k == key {
return v;
}
}
None
}
pub fn opt_args<S: AsRef<OsStr>>(mut self, arg: &str, values: Option<Vec<S>>) -> Self {
if let Some(values) = values {
for value in values {
self.cmd.arg(arg);
self.cmd.arg(value);
}
}
self
}
pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
self.cmd.arg(arg.as_ref());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
self.cmd.args(args);
self
}
pub fn with_pr(mut self, pr: &'a dyn SingleReport) -> Self {
self.pr = Some(pr);
self
}
pub fn with_pr_arc(mut self, pr: Arc<Box<dyn SingleReport>>) -> Self {
self.pr_arc = Some(pr);
self
}
pub fn raw(mut self, raw: bool) -> Self {
self.raw = raw;
self
}
pub fn with_pass_signals(&mut self) -> &mut Self {
self.pass_signals = true;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn stdin_string(mut self, input: impl Into<String>) -> Self {
self.cmd.stdin(Stdio::piped());
self.stdin = Some(input.into());
self
}
#[allow(clippy::readonly_write_lock)]
pub fn execute(mut self) -> Result<()> {
static RAW_LOCK: RwLock<()> = RwLock::new(());
let read_lock = RAW_LOCK.read().unwrap();
debug!("$ {self}");
if Settings::get().raw || self.raw {
drop(read_lock);
let _write_lock = RAW_LOCK.write().unwrap();
return self.execute_raw();
}
let mut cp = self
.spawn_with_etxtbsy_retry()
.wrap_err_with(|| format!("failed to execute command: {self}"))?;
let id = cp.id();
RUNNING_PIDS.lock().unwrap().insert(id);
trace!("Started process: {id} for {}", self.get_program());
let (tx, rx) = channel();
if let Some(stdout) = cp.stdout.take() {
thread::spawn({
let name = self.to_string();
let tx = tx.clone();
move || {
for line in BufReader::new(stdout).lines() {
match line {
Ok(line) => {
let _ = tx.send(ChildProcessOutput::Stdout(line));
}
Err(e) => warn!("Failed to read stdout for {name}: {e}"),
}
}
}
});
}
if let Some(stderr) = cp.stderr.take() {
thread::spawn({
let name = self.to_string();
let tx = tx.clone();
move || {
for line in BufReader::new(stderr).lines() {
match line {
Ok(line) => {
let _ = tx.send(ChildProcessOutput::Stderr(line));
}
Err(e) => warn!("Failed to read stderr for {name}: {e}"),
}
}
}
});
}
if let Some(text) = self.stdin.take() {
let mut stdin = cp.stdin.take().unwrap();
thread::spawn(move || {
stdin.write_all(text.as_bytes()).unwrap();
});
}
#[cfg(not(any(test, target_os = "windows")))]
let mut sighandle = None;
#[cfg(not(any(test, target_os = "windows")))]
if self.pass_signals {
let mut signals =
Signals::new([SIGINT, SIGTERM, SIGTERM, SIGHUP, SIGQUIT, SIGUSR1, SIGUSR2])?;
sighandle = Some(signals.handle());
let tx = tx.clone();
thread::spawn(move || {
for sig in &mut signals {
let _ = tx.send(ChildProcessOutput::Signal(sig));
}
});
}
thread::spawn(move || {
let status = cp.wait().unwrap();
#[cfg(not(any(test, target_os = "windows")))]
if let Some(sighandle) = sighandle {
sighandle.close();
}
let _ = tx.send(ChildProcessOutput::ExitStatus(status));
});
let timeout_guard = self.timeout.map(|t| TimeoutGuard::new(t, id));
let mut combined_output = vec![];
let mut status = None;
for line in rx {
match line {
ChildProcessOutput::Stdout(line) => {
let line = self.redactor.redact(&line);
self.on_stdout(line.clone());
combined_output.push((line, OutputSource::Stdout));
}
ChildProcessOutput::Stderr(line) => {
let line = self.redactor.redact(&line);
self.on_stderr(line.clone());
combined_output.push((line, OutputSource::Stderr));
}
ChildProcessOutput::ExitStatus(s) => {
status = Some(s);
}
#[cfg(not(any(test, windows)))]
ChildProcessOutput::Signal(sig) => {
if sig != SIGINT {
debug!("Received signal {sig}, forwarding to {id}");
let pid = nix::unistd::Pid::from_raw(id as i32);
let sig = nix::sys::signal::Signal::try_from(sig).unwrap();
let _ = nix::sys::signal::kill(pid, sig);
}
}
}
}
RUNNING_PIDS.lock().unwrap().remove(&id);
if let Some(g) = &timeout_guard {
g.cancel();
}
let status = status.unwrap();
if !status.success() {
if let Some(duration) = timeout_guard.as_ref().and_then(|g| g.timed_out()) {
bail!("timed out after {duration:?}");
}
self.on_error(combined_output, status)?;
}
Ok(())
}
fn execute_raw(mut self) -> Result<()> {
if self.stdin.is_none() {
self.cmd.stdin(Stdio::inherit());
}
self.cmd.stdout(Stdio::inherit());
self.cmd.stderr(Stdio::inherit());
let mut cp = self.spawn_with_etxtbsy_retry()?;
let timeout_guard = self.timeout.map(|t| TimeoutGuard::new(t, cp.id()));
let status = cp.wait()?;
if let Some(g) = &timeout_guard {
g.cancel();
}
if !status.success() {
if let Some(duration) = timeout_guard.as_ref().and_then(|g| g.timed_out()) {
bail!("timed out after {duration:?}");
}
return self.on_error(vec![], status);
}
Ok(())
}
fn spawn_with_etxtbsy_retry(&mut self) -> std::io::Result<std::process::Child> {
let mut attempt = 0;
loop {
match self.cmd.spawn() {
Ok(child) => return Ok(child),
Err(err) if Self::is_etxtbsy(&err) && attempt < 3 => {
attempt += 1;
trace!("retrying spawn after ETXTBSY (attempt {}/3)", attempt);
std::thread::sleep(std::time::Duration::from_millis(50 * (1 << (attempt - 1))));
}
Err(err) => return Err(err),
}
}
}
pub async fn apply_sandbox(&mut self) -> eyre::Result<()> {
let Some(sandbox) = self.sandbox.take() else {
return Ok(());
};
if !sandbox.is_active() {
return Ok(());
}
#[cfg(target_os = "linux")]
if !sandbox.allow_net.is_empty() {
eyre::bail!(
"per-host network filtering (--allow-net=<host>) is not supported on Linux. \
Use --deny-net to block all network, or remove --allow-net."
);
}
#[cfg(target_os = "linux")]
{
if sandbox.effective_deny_env() {
self.cmd.env_clear();
}
use std::os::unix::process::CommandExt;
let sandbox = sandbox.clone();
unsafe {
self.cmd.pre_exec(move || {
if sandbox.effective_deny_read() || sandbox.effective_deny_write() {
crate::sandbox::landlock_apply(&sandbox)
.map_err(|e| std::io::Error::other(e.to_string()))?;
}
if sandbox.effective_deny_net() {
crate::sandbox::seccomp_apply()
.map_err(|e| std::io::Error::other(e.to_string()))?;
}
Ok(())
});
}
}
#[cfg(target_os = "macos")]
{
let program = self.cmd.get_program().to_os_string();
let args: Vec<String> = self
.cmd
.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect();
let profile = crate::sandbox::macos_generate_profile(&sandbox).await;
let mut new_cmd = Command::new("sandbox-exec");
new_cmd.arg("-p").arg(&profile).arg("--").arg(&program);
for arg in &args {
new_cmd.arg(arg);
}
new_cmd.stdin(Stdio::null());
new_cmd.stdout(Stdio::piped());
new_cmd.stderr(Stdio::piped());
if let Some(dir) = self.cmd.get_current_dir() {
new_cmd.current_dir(dir);
}
if sandbox.effective_deny_env() {
new_cmd.env_clear();
}
for (k, v) in self.cmd.get_envs() {
if let Some(v) = v {
new_cmd.env(k, v);
}
}
self.cmd = new_cmd;
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = sandbox;
warn!("sandbox is not supported on this platform, running unsandboxed");
}
Ok(())
}
#[cfg(unix)]
fn is_etxtbsy(err: &std::io::Error) -> bool {
err.raw_os_error() == Some(nix::errno::Errno::ETXTBSY as i32)
}
#[cfg(not(unix))]
fn is_etxtbsy(_err: &std::io::Error) -> bool {
false
}
fn on_stdout(&self, line: String) {
let _lock = OUTPUT_LOCK.lock().unwrap();
if let Some(on_stdout) = &self.on_stdout {
on_stdout(line);
return;
}
if let Some(pr) = self
.pr
.or(self.pr_arc.as_ref().map(|arc| arc.as_ref().as_ref()))
{
if !line.trim().is_empty() {
pr.set_message(line)
}
} else {
let mut stdout = std::io::stdout().lock();
let _ = if console::colors_enabled() {
writeln!(stdout, "{line}\x1b[0m")
} else {
writeln!(stdout, "{line}")
};
}
}
fn on_stderr(&self, line: String) {
let _lock = OUTPUT_LOCK.lock().unwrap();
if let Some(on_stderr) = &self.on_stderr {
on_stderr(line);
return;
}
match self
.pr
.or(self.pr_arc.as_ref().map(|arc| arc.as_ref().as_ref()))
{
Some(pr) => {
if !line.trim().is_empty() {
pr.println(line)
}
}
None => {
let mut stderr = std::io::stderr().lock();
let _ = if console::colors_enabled_stderr() {
writeln!(stderr, "{line}\x1b[0m")
} else {
writeln!(stderr, "{line}")
};
}
}
}
fn on_error(&self, output: Vec<(String, OutputSource)>, status: ExitStatus) -> Result<()> {
match self
.pr
.or(self.pr_arc.as_ref().map(|arc| arc.as_ref().as_ref()))
{
Some(pr) => {
error!("{} failed", self.get_program());
if self.on_stdout.is_none() {
let stdout_only: String = output
.into_iter()
.filter(|(_, source)| matches!(source, OutputSource::Stdout))
.map(|(line, _)| line)
.collect::<Vec<_>>()
.join("\n");
if !stdout_only.trim().is_empty() {
pr.println(stdout_only);
}
}
}
None => {
}
}
Err(ScriptFailed(self.get_program(), Some(status)))?
}
fn get_program(&self) -> String {
display_path(PathBuf::from(self.cmd.get_program()))
}
fn get_args(&self) -> Vec<String> {
self.cmd
.get_args()
.map(|s| s.to_string_lossy().to_string())
.collect::<Vec<_>>()
}
}
impl Display for CmdLineRunner<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let args = self.get_args().join(" ");
write!(f, "{} {args}", self.get_program())
}
}
impl Debug for CmdLineRunner<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let args = self.get_args().join(" ");
write!(f, "{} {args}", self.get_program())
}
}
enum OutputSource {
Stdout,
Stderr,
}
enum ChildProcessOutput {
Stdout(String),
Stderr(String),
ExitStatus(ExitStatus),
#[cfg(not(any(test, target_os = "windows")))]
Signal(i32),
}
pub async fn cmd_read_async<I, K, V>(program: &str, args: &[&str], env: I) -> Result<String>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
let display_args = args.join(" ");
debug!("$ {program} {display_args}");
let output = tokio::process::Command::new(program)
.args(args)
.env_clear()
.envs(env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.output()
.await
.wrap_err_with(|| format!("failed to execute command: {program} {display_args}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!(
"{program} {display_args} failed: exit code {}\n{}",
output.status.code().unwrap_or(-1),
stderr.trim()
);
}
let stdout = String::from_utf8(output.stdout)
.wrap_err_with(|| format!("{program} produced invalid UTF-8 output"))?;
Ok(stdout.trim_end().to_string())
}
pub async fn cmd_read_async_inherited_env<I, K, V>(
program: &str,
args: &[&str],
extra_env: I,
) -> Result<String>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
let display_args = args.join(" ");
debug!("$ {program} {display_args}");
let output = tokio::process::Command::new(program)
.args(args)
.envs(extra_env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.output()
.await
.wrap_err_with(|| format!("failed to execute command: {program} {display_args}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!(
"{program} {display_args} failed: exit code {}\n{}",
output.status.code().unwrap_or(-1),
stderr.trim()
);
}
let stdout = String::from_utf8(output.stdout)
.wrap_err_with(|| format!("{program} produced invalid UTF-8 output"))?;
Ok(stdout.trim_end().to_string())
}
#[cfg(test)]
#[cfg(unix)]
mod tests {
use pretty_assertions::assert_eq;
use crate::config::Config;
#[tokio::test]
async fn test_cmd() {
let _config = Config::get().await.unwrap();
let output = cmd!("echo", "foo", "bar").read().unwrap();
assert_eq!("foo bar", output);
}
}