use std::collections::HashSet;
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Display, Formatter};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};
use std::thread;
use std::time::Duration;
use crate::redactions::Redactor;
use color_eyre::Result;
use duct::{Expression, IntoExecutablePath};
use eyre::{Context, bail};
#[cfg(not(any(test, target_os = "windows")))]
use signal_hook::consts::{SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2};
#[cfg(not(any(test, target_os = "windows")))]
use signal_hook::iterator::Signals;
use std::sync::LazyLock as Lazy;
use crate::config::Settings;
use crate::env;
use crate::env::PATH_KEY;
use crate::errors::Error::ScriptFailed;
use crate::file::display_path;
use crate::path_env::PathEnv;
use crate::ui::progress_report::SingleReport;
#[macro_export]
macro_rules! cmd {
( $program:expr $(, $arg:expr )* $(,)? ) => {
{
use std::ffi::OsString;
let args: std::vec::Vec<OsString> = std::vec![$( Into::<OsString>::into($arg) ),*];
$crate::cmd::cmd($program, args)
}
};
}
pub fn cmd<T, U>(program: T, args: U) -> Expression
where
T: IntoExecutablePath,
U: IntoIterator,
U::Item: Into<OsString>,
{
let program = program.to_executable();
let args: Vec<OsString> = args.into_iter().map(Into::<OsString>::into).collect();
let display_name = program.to_string_lossy();
let display_args = args
.iter()
.map(|s| s.to_string_lossy())
.collect::<Vec<_>>()
.join(" ");
let display_command = [display_name.into(), display_args].join(" ");
debug!("$ {display_command}");
duct::cmd(program, args)
}
pub struct CmdLineRunner<'a> {
cmd: Command,
pr: Option<&'a dyn SingleReport>,
pr_arc: Option<Arc<Box<dyn SingleReport>>>,
stdin: Option<String>,
redactor: Redactor,
raw: bool,
pass_signals: bool,
on_stdout: Option<Box<dyn Fn(String) + Send + 'a>>,
on_stderr: Option<Box<dyn Fn(String) + Send + 'a>>,
timeout: Option<Duration>,
}
const GUARD_RUNNING: u8 = 0;
const GUARD_CANCELLED: u8 = 1;
const GUARD_TIMED_OUT: u8 = 2;
fn wait_for_cancel_or_deadline<'a>(
cvar: &'a Condvar,
mut guard: MutexGuard<'a, bool>,
deadline: std::time::Instant,
) -> (MutexGuard<'a, bool>, bool) {
loop {
if *guard {
return (guard, true);
}
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
return (guard, false);
}
let (g, result) = cvar.wait_timeout(guard, remaining).unwrap();
guard = g;
if result.timed_out() {
return (guard, false);
}
}
}
struct TimeoutGuard {
state: Arc<AtomicU8>,
cancel: Arc<(Mutex<bool>, Condvar)>,
timeout: Duration,
}
impl TimeoutGuard {
fn new(timeout: Duration, pid: u32) -> Self {
let state = Arc::new(AtomicU8::new(GUARD_RUNNING));
let cancel = Arc::new((Mutex::new(false), Condvar::new()));
let state_clone = state.clone();
let cancel_clone = cancel.clone();
thread::spawn(move || {
let (lock, cvar) = &*cancel_clone;
let guard = lock.lock().unwrap();
let deadline = std::time::Instant::now() + timeout;
let (guard, cancelled) = wait_for_cancel_or_deadline(cvar, guard, deadline);
if cancelled {
return;
}
if state_clone
.compare_exchange(
GUARD_RUNNING,
GUARD_TIMED_OUT,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
return;
}
#[cfg(unix)]
{
let pid = nix::unistd::Pid::from_raw(pid as i32);
let _ = nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGTERM);
drop(guard);
let guard = lock.lock().unwrap();
let grace_deadline = std::time::Instant::now() + Duration::from_secs(5);
let (_guard, cancelled) = wait_for_cancel_or_deadline(cvar, guard, grace_deadline);
if !cancelled {
let _ = nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGKILL);
}
}
#[cfg(windows)]
{
drop(guard);
let _ = Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
});
Self {
state,
cancel,
timeout,
}
}
fn cancel(&self) {
self.state
.compare_exchange(
GUARD_RUNNING,
GUARD_CANCELLED,
Ordering::AcqRel,
Ordering::Acquire,
)
.ok();
let (lock, cvar) = &*self.cancel;
*lock.lock().unwrap() = true;
cvar.notify_one();
}
fn timed_out(&self) -> Option<Duration> {
(self.state.load(Ordering::Acquire) == GUARD_TIMED_OUT).then_some(self.timeout)
}
}
impl Drop for TimeoutGuard {
fn drop(&mut self) {
self.cancel();
}
}
static OUTPUT_LOCK: Mutex<()> = Mutex::new(());
static RUNNING_PIDS: Lazy<Mutex<HashSet<u32>>> = Lazy::new(Default::default);
impl<'a> CmdLineRunner<'a> {
pub fn new<P: AsRef<OsStr>>(program: P) -> Self {
let mut cmd = Command::new(program);
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
Self {
cmd,
pr: None,
pr_arc: None,
stdin: None,
redactor: Default::default(),
raw: false,
pass_signals: false,
on_stdout: None,
on_stderr: None,
timeout: None,
}
}
#[cfg(unix)]
pub fn kill_all(signal: nix::sys::signal::Signal) {
let pids = RUNNING_PIDS.lock().unwrap();
for pid in pids.iter() {
let pid = *pid as i32;
trace!("{signal}: {pid}");
if let Err(e) = nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), signal) {
debug!("Failed to kill cmd {pid}: {e}");
}
}
}
#[cfg(windows)]
pub fn kill_all() {
let pids = RUNNING_PIDS.lock().unwrap();
for pid in pids.iter() {
if let Err(e) = Command::new("taskkill")
.arg("/F")
.arg("/T")
.arg("/PID")
.arg(pid.to_string())
.spawn()
{
warn!("Failed to kill cmd {pid}: {e}");
}
}
}
pub fn stdin<T: Into<Stdio>>(mut self, cfg: T) -> Self {
self.cmd.stdin(cfg);
self
}
pub fn stdout<T: Into<Stdio>>(mut self, cfg: T) -> Self {
self.cmd.stdout(cfg);
self
}
pub fn stderr<T: Into<Stdio>>(mut self, cfg: T) -> Self {
self.cmd.stderr(cfg);
self
}
pub fn redact(mut self, redactions: impl IntoIterator<Item = String>) -> Self {
self.redactor = self.redactor.with_additional(redactions);
self
}
pub fn with_on_stdout<F: Fn(String) + Send + 'a>(mut self, on_stdout: F) -> Self {
self.on_stdout = Some(Box::new(on_stdout));
self
}
pub fn with_on_stderr<F: Fn(String) + Send + 'a>(mut self, on_stderr: F) -> Self {
self.on_stderr = Some(Box::new(on_stderr));
self
}
pub fn current_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
self.cmd.current_dir(dir);
self
}
pub fn env_clear(mut self) -> Self {
self.cmd.env_clear();
self
}
pub fn env<K, V>(mut self, key: K, val: V) -> Self
where
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.cmd.env(key, val);
self
}
pub fn envs<I, K, V>(mut self, vars: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.cmd.envs(vars);
self
}
pub fn prepend_path(mut self, paths: Vec<PathBuf>) -> eyre::Result<Self> {
let existing = self
.get_env(&PATH_KEY)
.map(|c| c.to_owned())
.unwrap_or_else(|| env::var_os(&*PATH_KEY).unwrap());
let mut path_env = PathEnv::from_iter(env::split_paths(&existing));
for p in paths {
path_env.add(p);
}
self.cmd.env(&*PATH_KEY, path_env.join());
Ok(self)
}
fn get_env(&self, key: &str) -> Option<&OsStr> {
for (k, v) in self.cmd.get_envs() {
if k == key {
return v;
}
}
None
}
pub fn opt_args<S: AsRef<OsStr>>(mut self, arg: &str, values: Option<Vec<S>>) -> Self {
if let Some(values) = values {
for value in values {
self.cmd.arg(arg);
self.cmd.arg(value);
}
}
self
}
pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
self.cmd.arg(arg.as_ref());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
self.cmd.args(args);
self
}
pub fn with_pr(mut self, pr: &'a dyn SingleReport) -> Self {
self.pr = Some(pr);
self
}
pub fn with_pr_arc(mut self, pr: Arc<Box<dyn SingleReport>>) -> Self {
self.pr_arc = Some(pr);
self
}
pub fn raw(mut self, raw: bool) -> Self {
self.raw = raw;
self
}
pub fn with_pass_signals(&mut self) -> &mut Self {
self.pass_signals = true;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn stdin_string(mut self, input: impl Into<String>) -> Self {
self.cmd.stdin(Stdio::piped());
self.stdin = Some(input.into());
self
}
#[allow(clippy::readonly_write_lock)]
pub fn execute(mut self) -> Result<()> {
static RAW_LOCK: RwLock<()> = RwLock::new(());
let read_lock = RAW_LOCK.read().unwrap();
debug!("$ {self}");
if Settings::get().raw || self.raw {
drop(read_lock);
let _write_lock = RAW_LOCK.write().unwrap();
return self.execute_raw();
}
let mut cp = self
.spawn_with_etxtbsy_retry()
.wrap_err_with(|| format!("failed to execute command: {self}"))?;
let id = cp.id();
RUNNING_PIDS.lock().unwrap().insert(id);
trace!("Started process: {id} for {}", self.get_program());
let (tx, rx) = channel();
if let Some(stdout) = cp.stdout.take() {
thread::spawn({
let name = self.to_string();
let tx = tx.clone();
move || {
for line in BufReader::new(stdout).lines() {
match line {
Ok(line) => {
let _ = tx.send(ChildProcessOutput::Stdout(line));
}
Err(e) => warn!("Failed to read stdout for {name}: {e}"),
}
}
}
});
}
if let Some(stderr) = cp.stderr.take() {
thread::spawn({
let name = self.to_string();
let tx = tx.clone();
move || {
for line in BufReader::new(stderr).lines() {
match line {
Ok(line) => {
let _ = tx.send(ChildProcessOutput::Stderr(line));
}
Err(e) => warn!("Failed to read stderr for {name}: {e}"),
}
}
}
});
}
if let Some(text) = self.stdin.take() {
let mut stdin = cp.stdin.take().unwrap();
thread::spawn(move || {
stdin.write_all(text.as_bytes()).unwrap();
});
}
#[cfg(not(any(test, target_os = "windows")))]
let mut sighandle = None;
#[cfg(not(any(test, target_os = "windows")))]
if self.pass_signals {
let mut signals =
Signals::new([SIGINT, SIGTERM, SIGTERM, SIGHUP, SIGQUIT, SIGUSR1, SIGUSR2])?;
sighandle = Some(signals.handle());
let tx = tx.clone();
thread::spawn(move || {
for sig in &mut signals {
let _ = tx.send(ChildProcessOutput::Signal(sig));
}
});
}
thread::spawn(move || {
let status = cp.wait().unwrap();
#[cfg(not(any(test, target_os = "windows")))]
if let Some(sighandle) = sighandle {
sighandle.close();
}
let _ = tx.send(ChildProcessOutput::ExitStatus(status));
});
let timeout_guard = self.timeout.map(|t| TimeoutGuard::new(t, id));
let mut combined_output = vec![];
let mut status = None;
for line in rx {
match line {
ChildProcessOutput::Stdout(line) => {
let line = self.redactor.redact(&line);
self.on_stdout(line.clone());
combined_output.push((line, OutputSource::Stdout));
}
ChildProcessOutput::Stderr(line) => {
let line = self.redactor.redact(&line);
self.on_stderr(line.clone());
combined_output.push((line, OutputSource::Stderr));
}
ChildProcessOutput::ExitStatus(s) => {
status = Some(s);
}
#[cfg(not(any(test, windows)))]
ChildProcessOutput::Signal(sig) => {
if sig != SIGINT {
debug!("Received signal {sig}, forwarding to {id}");
let pid = nix::unistd::Pid::from_raw(id as i32);
let sig = nix::sys::signal::Signal::try_from(sig).unwrap();
let _ = nix::sys::signal::kill(pid, sig);
}
}
}
}
RUNNING_PIDS.lock().unwrap().remove(&id);
if let Some(g) = &timeout_guard {
g.cancel();
}
let status = status.unwrap();
if !status.success() {
if let Some(duration) = timeout_guard.as_ref().and_then(|g| g.timed_out()) {
bail!("timed out after {duration:?}");
}
self.on_error(combined_output, status)?;
}
Ok(())
}
fn execute_raw(mut self) -> Result<()> {
let mut cp = self.spawn_with_etxtbsy_retry()?;
let timeout_guard = self.timeout.map(|t| TimeoutGuard::new(t, cp.id()));
let status = cp.wait()?;
if let Some(g) = &timeout_guard {
g.cancel();
}
if !status.success() {
if let Some(duration) = timeout_guard.as_ref().and_then(|g| g.timed_out()) {
bail!("timed out after {duration:?}");
}
return self.on_error(vec![], status);
}
Ok(())
}
fn spawn_with_etxtbsy_retry(&mut self) -> std::io::Result<std::process::Child> {
let mut attempt = 0;
loop {
match self.cmd.spawn() {
Ok(child) => return Ok(child),
Err(err) if Self::is_etxtbsy(&err) && attempt < 3 => {
attempt += 1;
trace!("retrying spawn after ETXTBSY (attempt {}/3)", attempt);
std::thread::sleep(std::time::Duration::from_millis(50 * (1 << (attempt - 1))));
}
Err(err) => return Err(err),
}
}
}
#[cfg(unix)]
fn is_etxtbsy(err: &std::io::Error) -> bool {
err.raw_os_error() == Some(nix::errno::Errno::ETXTBSY as i32)
}
#[cfg(not(unix))]
fn is_etxtbsy(_err: &std::io::Error) -> bool {
false
}
fn on_stdout(&self, line: String) {
let _lock = OUTPUT_LOCK.lock().unwrap();
if let Some(on_stdout) = &self.on_stdout {
on_stdout(line);
return;
}
if let Some(pr) = self
.pr
.or(self.pr_arc.as_ref().map(|arc| arc.as_ref().as_ref()))
{
if !line.trim().is_empty() {
pr.set_message(line)
}
} else {
let mut stdout = std::io::stdout().lock();
let _ = if console::colors_enabled() {
writeln!(stdout, "{line}\x1b[0m")
} else {
writeln!(stdout, "{line}")
};
}
}
fn on_stderr(&self, line: String) {
let _lock = OUTPUT_LOCK.lock().unwrap();
if let Some(on_stderr) = &self.on_stderr {
on_stderr(line);
return;
}
match self
.pr
.or(self.pr_arc.as_ref().map(|arc| arc.as_ref().as_ref()))
{
Some(pr) => {
if !line.trim().is_empty() {
pr.println(line)
}
}
None => {
let mut stderr = std::io::stderr().lock();
let _ = if console::colors_enabled_stderr() {
writeln!(stderr, "{line}\x1b[0m")
} else {
writeln!(stderr, "{line}")
};
}
}
}
fn on_error(&self, output: Vec<(String, OutputSource)>, status: ExitStatus) -> Result<()> {
match self
.pr
.or(self.pr_arc.as_ref().map(|arc| arc.as_ref().as_ref()))
{
Some(pr) => {
error!("{} failed", self.get_program());
if self.on_stdout.is_none() {
let stdout_only: String = output
.into_iter()
.filter(|(_, source)| matches!(source, OutputSource::Stdout))
.map(|(line, _)| line)
.collect::<Vec<_>>()
.join("\n");
if !stdout_only.trim().is_empty() {
pr.println(stdout_only);
}
}
}
None => {
}
}
Err(ScriptFailed(self.get_program(), Some(status)))?
}
fn get_program(&self) -> String {
display_path(PathBuf::from(self.cmd.get_program()))
}
fn get_args(&self) -> Vec<String> {
self.cmd
.get_args()
.map(|s| s.to_string_lossy().to_string())
.collect::<Vec<_>>()
}
}
impl Display for CmdLineRunner<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let args = self.get_args().join(" ");
write!(f, "{} {args}", self.get_program())
}
}
impl Debug for CmdLineRunner<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let args = self.get_args().join(" ");
write!(f, "{} {args}", self.get_program())
}
}
enum OutputSource {
Stdout,
Stderr,
}
enum ChildProcessOutput {
Stdout(String),
Stderr(String),
ExitStatus(ExitStatus),
#[cfg(not(any(test, target_os = "windows")))]
Signal(i32),
}
#[cfg(test)]
#[cfg(unix)]
mod tests {
use pretty_assertions::assert_eq;
use crate::config::Config;
#[tokio::test]
async fn test_cmd() {
let _config = Config::get().await.unwrap();
let output = cmd!("echo", "foo", "bar").read().unwrap();
assert_eq!("foo bar", output);
}
}