use std::collections::VecDeque;
use std::ffi::CString;
use std::io::{self, BufRead, BufReader};
use std::os::unix::process::CommandExt;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use duct::Expression;
const SECRET_OPTIONS: &[&str] = &["--password", "--passphrase", "--secret", "--token"];
const REDACTED: &str = "<redacted>";
pub type LogFn = Box<dyn FnMut(String) + Send>;
const OUTPUT_TAIL_LINES: usize = 12;
fn format_argv(argv: &[String]) -> String {
let mut parts: Vec<String> = Vec::with_capacity(argv.len());
let mut redact_next = false;
for arg in argv {
if redact_next {
parts.push(REDACTED.to_string());
redact_next = false;
continue;
}
if SECRET_OPTIONS.contains(&arg.as_str()) {
parts.push(arg.clone());
redact_next = true;
continue;
}
if let Some((name, _value)) = arg.split_once('=') {
if SECRET_OPTIONS.contains(&name) {
parts.push(format!("{name}={REDACTED}"));
continue;
}
}
parts.push(arg.clone());
}
parts.join(" ")
}
fn push_tail(tail: &mut VecDeque<String>, line: String) {
if line.trim().is_empty() {
return;
}
if tail.len() >= OUTPUT_TAIL_LINES {
tail.pop_front();
}
tail.push_back(line);
}
fn build_expression(
argv: &[String],
env: Option<&[(String, String)]>,
chroot_dir: Option<&str>,
) -> Expression {
let mut expr = duct::cmd(&argv[0], &argv[1..])
.stderr_to_stdout()
.stdin_null()
.unchecked();
if let Some(pairs) = env {
for (k, v) in pairs {
expr = expr.env(k, v);
}
}
let chroot_dir = chroot_dir.map(str::to_string);
expr.before_spawn(move |command| {
let chroot_c = chroot_dir
.as_deref()
.map(CString::new)
.transpose()
.map_err(|_| io::Error::other("chroot path contains an interior NUL"))?;
unsafe {
command.pre_exec(move || {
if let Some(ref dir) = chroot_c {
if libc::chroot(dir.as_ptr()) != 0 {
return Err(io::Error::last_os_error());
}
if libc::chdir(c"/".as_ptr()) != 0 {
return Err(io::Error::last_os_error());
}
}
libc::setsid();
Ok(())
});
}
Ok(())
})
}
pub struct Cmd<'a> {
log: &'a mut LogFn,
argv: Vec<String>,
env: Option<&'a [(String, String)]>,
heartbeat: Option<(Duration, &'a str)>,
chroot_dir: Option<&'a str>,
}
impl<'a> Cmd<'a> {
pub fn new(log: &'a mut LogFn, argv: Vec<String>) -> Self {
Cmd {
log,
argv,
env: None,
heartbeat: None,
chroot_dir: None,
}
}
#[must_use]
pub fn env(mut self, env: &'a [(String, String)]) -> Self {
self.env = Some(env);
self
}
#[must_use]
pub fn heartbeat(mut self, interval: Duration, msg: &'a str) -> Self {
self.heartbeat = Some((interval, msg));
self
}
#[must_use]
pub fn chroot(mut self, dir: &'a str) -> Self {
self.chroot_dir = Some(dir);
self
}
pub fn run(self) -> Result<()> {
let display = format_argv(&self.argv);
(self.log)(format!("$ {display}"));
let reader = build_expression(&self.argv, self.env, self.chroot_dir)
.reader()
.with_context(|| format!("failed to spawn {}", self.argv[0]))?;
let (tx, rx) = mpsc::channel::<String>();
let reader_thread = std::thread::spawn(move || -> i32 {
for line in BufReader::new(&reader)
.lines()
.map_while(std::result::Result::ok)
{
if tx.send(line).is_err() {
break;
}
}
match reader.try_wait() {
Ok(Some(output)) => output.status.code().unwrap_or(-1),
_ => -1,
}
});
let started = Instant::now();
let mut last_heartbeat = Instant::now();
let mut output_tail = VecDeque::new();
loop {
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(line) => {
let line = crate::util::clean_log_line(&line);
if !line.is_empty() {
push_tail(&mut output_tail, line.clone());
(self.log)(line);
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
if let Some((interval, msg)) = self.heartbeat {
if last_heartbeat.elapsed() >= interval {
let elapsed = started.elapsed().as_secs();
(self.log)(format!("{msg} ({elapsed}s elapsed)"));
last_heartbeat = Instant::now();
}
}
}
let rc = reader_thread.join().unwrap_or(-1);
if rc != 0 {
if let Some(last_line) = output_tail.back() {
bail!("command failed (exit {rc}): {last_line}\ncommand: {display}");
}
bail!("command failed (exit {rc}): {display}");
}
Ok(())
}
}