use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::{Arc, Mutex, PoisonError};
use tokio::sync::Notify;
use crate::process::{
ExitStatus, Process, ProcessError, ProcessId, ProcessSpawner, Signal, SpawnPlan, Spawned,
};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum MockBehaviour {
#[default]
Immediate,
OnTerminate {
graceful_exit_code: i32,
},
OnKillOnly,
}
#[derive(Debug, Clone)]
pub struct MockSpec {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub exit_code: i32,
pub pid: Option<ProcessId>,
pub behaviour: MockBehaviour,
}
impl Default for MockSpec {
fn default() -> Self {
Self {
stdout: Vec::new(),
stderr: Vec::new(),
exit_code: 0,
pid: Some(ProcessId(1)),
behaviour: MockBehaviour::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SpawnRecord {
pub plan: SpawnPlan,
}
#[derive(Debug, Default)]
struct SharedState {
specs: VecDeque<MockSpec>,
spawns: Vec<SpawnRecord>,
signal_logs: Vec<Arc<Mutex<Vec<Signal>>>>,
}
#[derive(Debug, Default, Clone)]
pub struct MockProcessSpawner {
state: Arc<Mutex<SharedState>>,
}
impl MockProcessSpawner {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn push_spec(&self, spec: MockSpec) {
self.state
.lock()
.unwrap_or_else(PoisonError::into_inner)
.specs
.push_back(spec);
}
#[must_use]
pub fn spawns(&self) -> Vec<SpawnRecord> {
self.state
.lock()
.unwrap_or_else(PoisonError::into_inner)
.spawns
.clone()
}
#[must_use]
pub fn signals_for(&self, index: usize) -> Option<Vec<Signal>> {
let state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let log = state.signal_logs.get(index)?;
let snapshot = log.lock().unwrap_or_else(PoisonError::into_inner).clone();
Some(snapshot)
}
}
impl ProcessSpawner for MockProcessSpawner {
type Process = MockProcess;
async fn spawn(&self, plan: &SpawnPlan) -> Result<Spawned<Self::Process>, ProcessError> {
if !plan.cwd.is_absolute() {
return Err(ProcessError::NonAbsoluteCwd {
cwd: plan.cwd.clone(),
});
}
let signal_log = Arc::new(Mutex::new(Vec::new()));
let spec = {
let mut guard = self.state.lock().map_err(|_| ProcessError::SpawnFailed {
program: plan.program.clone(),
source: std::io::Error::other("mock spawner internal state lock poisoned"),
})?;
guard.spawns.push(SpawnRecord { plan: plan.clone() });
guard.signal_logs.push(signal_log.clone());
guard.specs.pop_front().unwrap_or_default()
};
Ok(Spawned {
process: MockProcess {
pid: spec.pid,
exit_code: spec.exit_code,
behaviour: spec.behaviour,
signals: signal_log,
signal_state: SignalState::Pending,
wake: Arc::new(Notify::new()),
},
stdout: Cursor::new(spec.stdout),
stderr: Cursor::new(spec.stderr),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SignalState {
Pending,
Terminated,
Killed,
}
#[derive(Debug)]
pub struct MockProcess {
pid: Option<ProcessId>,
exit_code: i32,
behaviour: MockBehaviour,
signals: Arc<Mutex<Vec<Signal>>>,
signal_state: SignalState,
wake: Arc<Notify>,
}
impl MockProcess {
#[must_use]
pub fn signals(&self) -> Vec<Signal> {
self.signals
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone()
}
}
impl Process for MockProcess {
type Stdout = Cursor<Vec<u8>>;
type Stderr = Cursor<Vec<u8>>;
fn id(&self) -> Option<ProcessId> {
self.pid
}
fn send_signal(&mut self, signal: Signal) -> Result<(), ProcessError> {
self.signals
.lock()
.unwrap_or_else(PoisonError::into_inner)
.push(signal);
match signal {
Signal::Terminate => {
if self.signal_state == SignalState::Pending {
self.signal_state = SignalState::Terminated;
}
}
Signal::Kill => {
self.signal_state = SignalState::Killed;
}
Signal::Interrupt => {}
}
self.wake.notify_waiters();
Ok(())
}
async fn wait(&mut self) -> Result<ExitStatus, ProcessError> {
loop {
if let Some(code) = self.poll_exit_code() {
self.pid = None;
return Ok(synthetic_exit_status(code));
}
let waiter = self.wake.clone();
waiter.notified().await;
}
}
}
impl MockProcess {
fn poll_exit_code(&self) -> Option<i32> {
match (self.behaviour, self.signal_state) {
(MockBehaviour::OnTerminate { graceful_exit_code }, SignalState::Terminated) => {
Some(graceful_exit_code)
}
(MockBehaviour::Immediate, _)
| (
MockBehaviour::OnTerminate { .. } | MockBehaviour::OnKillOnly,
SignalState::Killed,
) => Some(self.exit_code),
_ => None,
}
}
}
#[cfg(unix)]
fn synthetic_exit_status(code: i32) -> ExitStatus {
use std::os::unix::process::ExitStatusExt;
ExitStatus::from_raw((code & 0xff) << 8)
}
#[cfg(not(unix))]
#[allow(clippy::cast_sign_loss)]
fn synthetic_exit_status(code: i32) -> ExitStatus {
use std::os::windows::process::ExitStatusExt;
ExitStatus::from_raw(code as u32)
}
#[cfg(test)]
mod tests {
use std::ffi::OsString;
use std::path::PathBuf;
use tokio::io::AsyncReadExt;
use super::*;
fn abs(path: &str) -> PathBuf {
let mut p = std::env::temp_dir();
p.push(path);
p
}
fn plan(program: &str) -> SpawnPlan {
SpawnPlan {
program: OsString::from(program),
args: Vec::new(),
env: Vec::new(),
cwd: abs("haz-exec-mock-tests"),
}
}
#[tokio::test]
async fn spawn_records_plan_in_chronological_order() {
let spawner = MockProcessSpawner::new();
let _ = spawner.spawn(&plan("first")).await.unwrap();
let _ = spawner.spawn(&plan("second")).await.unwrap();
let records = spawner.spawns();
assert_eq!(records.len(), 2);
assert_eq!(records[0].plan.program, OsString::from("first"));
assert_eq!(records[1].plan.program, OsString::from("second"));
}
#[tokio::test]
async fn spawn_uses_default_spec_when_queue_empty() {
let spawner = MockProcessSpawner::new();
let mut child = spawner.spawn(&plan("noop")).await.unwrap();
let mut stdout = Vec::new();
child.stdout.read_to_end(&mut stdout).await.unwrap();
let mut stderr = Vec::new();
child.stderr.read_to_end(&mut stderr).await.unwrap();
let status = child.process.wait().await.unwrap();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
assert!(status.success());
#[cfg(unix)]
assert_eq!(status.code(), Some(0));
}
#[tokio::test]
async fn spawn_pops_specs_in_fifo_order() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
stdout: b"first-out".to_vec(),
..MockSpec::default()
});
spawner.push_spec(MockSpec {
stdout: b"second-out".to_vec(),
..MockSpec::default()
});
let mut a = spawner.spawn(&plan("a")).await.unwrap();
let mut b = spawner.spawn(&plan("b")).await.unwrap();
let mut a_bytes = Vec::new();
a.stdout.read_to_end(&mut a_bytes).await.unwrap();
let mut b_bytes = Vec::new();
b.stdout.read_to_end(&mut b_bytes).await.unwrap();
assert_eq!(a_bytes, b"first-out");
assert_eq!(b_bytes, b"second-out");
}
#[tokio::test]
async fn spawn_yields_canned_streams() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
stdout: b"out-bytes".to_vec(),
stderr: b"err-bytes".to_vec(),
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
let mut stdout = Vec::new();
child.stdout.read_to_end(&mut stdout).await.unwrap();
let mut stderr = Vec::new();
child.stderr.read_to_end(&mut stderr).await.unwrap();
assert_eq!(stdout, b"out-bytes");
assert_eq!(stderr, b"err-bytes");
}
#[tokio::test]
async fn wait_yields_canned_exit_code() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
exit_code: 42,
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
let status = child.process.wait().await.unwrap();
assert!(!status.success());
assert_eq!(status.code(), Some(42));
}
#[tokio::test]
async fn wait_clears_pid() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
pid: Some(ProcessId(12_345)),
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
assert_eq!(child.process.id(), Some(ProcessId(12_345)));
let _ = child.process.wait().await.unwrap();
assert_eq!(child.process.id(), None);
}
#[tokio::test]
async fn id_propagates_explicit_none_from_spec() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
pid: None,
..MockSpec::default()
});
let child = spawner.spawn(&plan("p")).await.unwrap();
assert_eq!(child.process.id(), None);
}
#[tokio::test]
async fn send_signal_records_in_order() {
let spawner = MockProcessSpawner::new();
let mut child = spawner.spawn(&plan("p")).await.unwrap();
child.process.send_signal(Signal::Terminate).unwrap();
child.process.send_signal(Signal::Interrupt).unwrap();
child.process.send_signal(Signal::Kill).unwrap();
assert_eq!(
child.process.signals(),
&[Signal::Terminate, Signal::Interrupt, Signal::Kill]
);
}
#[tokio::test]
async fn spawn_rejects_relative_cwd() {
let spawner = MockProcessSpawner::new();
let p = SpawnPlan {
program: OsString::from("p"),
args: Vec::new(),
env: Vec::new(),
cwd: PathBuf::from("relative/path"),
};
match spawner.spawn(&p).await {
Err(ProcessError::NonAbsoluteCwd { cwd }) => {
assert_eq!(cwd, PathBuf::from("relative/path"));
}
Err(other) => panic!("expected NonAbsoluteCwd, got {other:?}"),
Ok(_) => panic!("expected NonAbsoluteCwd, got success"),
}
assert!(spawner.spawns().is_empty());
}
#[tokio::test]
async fn cloned_spawner_shares_state() {
let a = MockProcessSpawner::new();
let b = a.clone();
a.push_spec(MockSpec {
stdout: b"shared".to_vec(),
..MockSpec::default()
});
let mut child = b.spawn(&plan("p")).await.unwrap();
let mut stdout = Vec::new();
child.stdout.read_to_end(&mut stdout).await.unwrap();
assert_eq!(stdout, b"shared");
assert_eq!(a.spawns().len(), 1);
assert_eq!(b.spawns().len(), 1);
}
#[tokio::test]
async fn spawn_records_full_plan_including_args_and_env() {
let spawner = MockProcessSpawner::new();
let p = SpawnPlan {
program: OsString::from("p"),
args: vec![OsString::from("a"), OsString::from("b")],
env: vec![(OsString::from("K"), OsString::from("V"))],
cwd: abs("captured"),
};
let _ = spawner.spawn(&p).await.unwrap();
let records = spawner.spawns();
assert_eq!(records.len(), 1);
assert_eq!(records[0].plan, p);
}
#[tokio::test]
async fn wait_with_exit_on_terminate_blocks_until_sigterm_arrives() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
behaviour: MockBehaviour::OnTerminate {
graceful_exit_code: 5,
},
exit_code: 99,
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
{
let wait_fut = child.process.wait();
let timeout = tokio::time::sleep(std::time::Duration::from_millis(20));
tokio::pin!(wait_fut, timeout);
tokio::select! {
biased;
() = &mut timeout => {}
_ = &mut wait_fut => panic!("wait should not resolve before a signal"),
}
}
child.process.send_signal(Signal::Terminate).unwrap();
let status = child.process.wait().await.unwrap();
assert!(!status.success());
assert_eq!(status.code(), Some(5));
assert_eq!(child.process.signals(), &[Signal::Terminate]);
}
#[tokio::test]
async fn wait_with_exit_on_terminate_completes_on_sigkill_too() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
behaviour: MockBehaviour::OnTerminate {
graceful_exit_code: 5,
},
exit_code: 137,
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
child.process.send_signal(Signal::Kill).unwrap();
let status = child.process.wait().await.unwrap();
assert_eq!(status.code(), Some(137));
}
#[tokio::test]
async fn wait_with_exit_on_kill_only_ignores_sigterm_until_sigkill() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
behaviour: MockBehaviour::OnKillOnly,
exit_code: 137,
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
child.process.send_signal(Signal::Terminate).unwrap();
{
let wait_fut = child.process.wait();
let timeout = tokio::time::sleep(std::time::Duration::from_millis(20));
tokio::pin!(wait_fut, timeout);
tokio::select! {
biased;
() = &mut timeout => {}
_ = &mut wait_fut => panic!("wait should not resolve on SIGTERM in OnKillOnly"),
}
}
child.process.send_signal(Signal::Kill).unwrap();
let status = child.process.wait().await.unwrap();
assert_eq!(status.code(), Some(137));
assert_eq!(child.process.signals(), &[Signal::Terminate, Signal::Kill]);
}
#[tokio::test]
async fn wait_exit_immediately_returns_without_signal() {
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
behaviour: MockBehaviour::Immediate,
exit_code: 0,
..MockSpec::default()
});
let mut child = spawner.spawn(&plan("p")).await.unwrap();
let status = child.process.wait().await.unwrap();
assert!(status.success());
}
}