use std::path::PathBuf;
use std::time::Duration;
use anyhow::anyhow;
use crate::protocol::{self, SubprocessOutcome};
pub(crate) struct SpawnRequest<'a> {
pub test_name: &'a str,
pub state_var: &'a str,
pub state_json: &'a str,
pub timeout: Option<Duration>,
}
pub(crate) trait SubprocessRunner: Send + Sync + 'static {
fn run(
&self,
req: SpawnRequest<'_>,
) -> impl std::future::Future<Output = anyhow::Result<SubprocessOutcome>> + Send;
}
pub(crate) struct OsSubprocessRunner {
exe: PathBuf,
no_capture: bool,
}
impl OsSubprocessRunner {
pub(crate) fn new(exe: PathBuf, no_capture: bool) -> Self {
Self { exe, no_capture }
}
}
impl SubprocessRunner for OsSubprocessRunner {
async fn run(&self, req: SpawnRequest<'_>) -> anyhow::Result<SubprocessOutcome> {
let mut cmd = tokio::process::Command::new(&self.exe);
cmd.arg("--run-single")
.arg(req.test_name)
.arg("--state-env-var")
.arg(req.state_var)
.env(req.state_var, req.state_json);
if self.no_capture {
spawn_no_capture(cmd, req.timeout).await
} else {
spawn_captured(cmd, req.timeout).await
}
}
}
const KILL_GRACE_PERIOD: Duration = Duration::from_secs(5);
async fn graceful_kill(child: &mut tokio::process::Child) {
#[cfg(unix)]
{
if let Some(pid) = child.id() {
unsafe { libc::kill(pid.cast_signed(), libc::SIGTERM) };
}
tokio::select! {
_ = child.wait() => return,
() = tokio::time::sleep(KILL_GRACE_PERIOD) => {}
}
}
let _ = child.kill().await;
}
enum WaitOutcome {
Exited(std::process::ExitStatus),
TimedOut(Duration),
}
async fn wait_or_timeout(
child: &mut tokio::process::Child,
timeout: Option<Duration>,
) -> anyhow::Result<WaitOutcome> {
match timeout {
Some(dur) => tokio::select! {
r = child.wait() => r.map(WaitOutcome::Exited).map_err(|e| anyhow!("{e}")),
() = tokio::time::sleep(dur) => {
graceful_kill(child).await;
Ok(WaitOutcome::TimedOut(dur))
}
},
None => child
.wait()
.await
.map(WaitOutcome::Exited)
.map_err(|e| anyhow!("{e}")),
}
}
async fn drain_pipe<R>(handle: Option<R>) -> String
where
R: tokio::io::AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt as _;
let Some(mut h) = handle else {
return String::new();
};
let mut bytes = Vec::new();
let _ = h.read_to_end(&mut bytes).await;
if bytes.is_empty() {
return String::new();
}
String::from_utf8_lossy(&bytes).into_owned()
}
async fn spawn_no_capture(
mut cmd: tokio::process::Command,
timeout: Option<Duration>,
) -> anyhow::Result<SubprocessOutcome> {
let mut child = cmd
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| anyhow!("failed to spawn test subprocess: {e}"))?;
let status = match wait_or_timeout(&mut child, timeout).await? {
WaitOutcome::TimedOut(dur) => return Ok(SubprocessOutcome::TimedOut(dur)),
WaitOutcome::Exited(s) => s,
};
let stderr = drain_pipe(child.stderr.take()).await;
match status.code() {
Some(0) => Ok(SubprocessOutcome::Passed),
Some(c) if c == protocol::SKIP_EXIT_CODE => Ok(SubprocessOutcome::Skipped(
protocol::decode_skip_reason(&stderr),
)),
code => {
eprint!("{stderr}");
Ok(SubprocessOutcome::Failed {
reason: protocol::exit_code_reason(code),
stdout: String::new(),
stderr: String::new(),
})
}
}
}
async fn spawn_captured(
mut cmd: tokio::process::Command,
timeout: Option<Duration>,
) -> anyhow::Result<SubprocessOutcome> {
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| anyhow!("failed to spawn test subprocess: {e}"))?;
let status = match wait_or_timeout(&mut child, timeout).await? {
WaitOutcome::TimedOut(dur) => return Ok(SubprocessOutcome::TimedOut(dur)),
WaitOutcome::Exited(s) => s,
};
let (stdout, stderr) = tokio::join!(
drain_pipe(child.stdout.take()),
drain_pipe(child.stderr.take())
);
Ok(protocol::decode_outcome(status.code(), stdout, stderr))
}