use std::ffi::{OsStr, OsString};
use std::path::PathBuf;
use std::sync::Mutex;
use crate::command::Command;
use crate::error::Result;
use crate::result::{Outcome, ProcessResult};
use crate::runner::ProcessRunner;
#[derive(Debug, Clone)]
pub struct Reply {
stdout: String,
stderr: String,
code: i32,
timed_out: bool,
signalled: bool,
signal: Option<i32>,
pending: bool,
line_delay: Option<std::time::Duration>,
}
impl Reply {
pub fn ok(stdout: impl Into<String>) -> Self {
Self {
stdout: stdout.into(),
stderr: String::new(),
code: 0,
timed_out: false,
signalled: false,
signal: None,
pending: false,
line_delay: None,
}
}
pub fn fail(code: i32, stderr: impl Into<String>) -> Self {
Self {
stdout: String::new(),
stderr: stderr.into(),
code,
timed_out: false,
signalled: false,
signal: None,
pending: false,
line_delay: None,
}
}
pub fn timeout() -> Self {
Self {
stdout: String::new(),
stderr: String::new(),
code: 0,
timed_out: true,
signalled: false,
signal: None,
pending: false,
line_delay: None,
}
}
pub fn signalled(signal: Option<i32>) -> Self {
Self {
stdout: String::new(),
stderr: String::new(),
code: 0,
timed_out: false,
signalled: true,
signal,
pending: false,
line_delay: None,
}
}
pub fn pending() -> Self {
Self {
stdout: String::new(),
stderr: String::new(),
code: 0,
timed_out: false,
signalled: false,
signal: None,
pending: true,
line_delay: None,
}
}
pub fn lines<I, S>(lines: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let mut text = lines
.into_iter()
.map(Into::into)
.collect::<Vec<_>>()
.join("\n");
if !text.is_empty() {
text.push('\n');
}
Self::ok(text)
}
pub fn with_line_delay(mut self, delay: std::time::Duration) -> Self {
self.line_delay = Some(delay);
self
}
pub fn with_stdout(mut self, stdout: impl Into<String>) -> Self {
self.stdout = stdout.into();
self
}
fn into_running(self, command: &Command) -> crate::RunningProcess {
let lifetime = if self.pending {
None
} else {
let per_line = self.line_delay.unwrap_or_default();
let stdout_lines = self.stdout.lines().count() as u32;
let stderr_lines = self.stderr.lines().count() as u32;
let lines = stdout_lines.max(stderr_lines);
Some(per_line.saturating_mul(lines).min(crate::MAX_DEADLINE))
};
let code_for_scripted = if self.timed_out || self.signalled {
None
} else {
Some(self.code)
};
let scripted = crate::running::ScriptedProc::new(
self.stdout,
self.stderr,
code_for_scripted,
self.timed_out,
self.signal,
lifetime,
self.line_delay,
);
crate::RunningProcess::from_scripted(command, scripted)
}
fn into_result(
self,
program: String,
timeout: Option<std::time::Duration>,
) -> ProcessResult<String> {
let outcome = if self.timed_out {
Outcome::TimedOut
} else if self.signalled {
Outcome::Signalled(self.signal)
} else {
Outcome::Exited(self.code)
};
ProcessResult::new(program, self.stdout, self.stderr, outcome, timeout)
}
}
type Predicate = Box<dyn Fn(&Command) -> bool + Send + Sync>;
enum Rule {
Prefix(Vec<OsString>),
Predicate(Predicate),
}
impl Rule {
fn matches(&self, command: &Command) -> bool {
match self {
Rule::Prefix(prefix) => match prefix.split_first() {
Some((program, args)) => {
command.program() == program.as_os_str()
&& command.arguments().starts_with(args)
}
None => true,
},
Rule::Predicate(pred) => pred(command),
}
}
}
fn collect_prefix<I, S>(prefix: I) -> Vec<OsString>
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
prefix
.into_iter()
.map(|s| s.as_ref().to_os_string())
.collect()
}
struct RuleEntry {
rule: Rule,
replies: Vec<Reply>,
next: std::sync::atomic::AtomicUsize,
}
#[derive(Default)]
pub struct ScriptedRunner {
rules: Vec<RuleEntry>,
fallback: Option<Reply>,
}
impl std::fmt::Debug for ScriptedRunner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ScriptedRunner")
.field("rules", &self.rules.len())
.field("has_fallback", &self.fallback.is_some())
.finish_non_exhaustive()
}
}
impl ScriptedRunner {
pub fn new() -> Self {
Self::default()
}
pub fn on<I, S>(mut self, prefix: I, reply: Reply) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
let prefix = collect_prefix(prefix);
self.push_rule(Rule::Prefix(prefix), vec![reply]);
self
}
pub fn on_sequence<I, S, R>(mut self, prefix: I, replies: R) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
R: IntoIterator<Item = Reply>,
{
let prefix = collect_prefix(prefix);
let replies: Vec<Reply> = replies.into_iter().collect();
assert!(
!replies.is_empty(),
"ScriptedRunner::on_sequence needs at least one reply"
);
self.push_rule(Rule::Prefix(prefix), replies);
self
}
pub fn when<F>(mut self, predicate: F, reply: Reply) -> Self
where
F: Fn(&Command) -> bool + Send + Sync + 'static,
{
self.push_rule(Rule::Predicate(Box::new(predicate)), vec![reply]);
self
}
pub fn fallback(mut self, reply: Reply) -> Self {
self.fallback = Some(reply);
self
}
fn push_rule(&mut self, rule: Rule, replies: Vec<Reply>) {
debug_assert!(
!replies.is_empty(),
"a ScriptedRunner rule needs at least one reply"
);
#[cfg(feature = "tracing")]
if let Rule::Prefix(new) = &rule {
for (i, existing) in self.rules.iter().enumerate() {
if let Rule::Prefix(earlier) = &existing.rule
&& new.starts_with(earlier)
{
tracing::warn!(
target: "processkit",
"ScriptedRunner: rule #{} is unreachable — shadowed by the broader \
earlier rule #{}; register more-specific rules first",
self.rules.len(),
i,
);
break;
}
}
}
self.rules.push(RuleEntry {
rule,
replies,
next: std::sync::atomic::AtomicUsize::new(0),
});
}
fn matched_reply(&self, command: &Command, program: &str) -> Result<&Reply> {
for entry in &self.rules {
if entry.rule.matches(command) {
let i = entry
.next
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(&entry.replies[i.min(entry.replies.len() - 1)]);
}
}
self.fallback
.as_ref()
.ok_or_else(|| crate::error::Error::Spawn {
program: program.to_owned(),
source: std::io::Error::new(
std::io::ErrorKind::NotFound,
"ScriptedRunner: no rule matched and no fallback set",
),
})
}
}
pub(crate) fn replay_line_handlers(command: &Command, stdout: &str, stderr: &str) {
let mut stdout_handler = command.stdout_handler();
for line in stdout.lines() {
invoke_isolated(&mut stdout_handler, line);
}
let mut stderr_handler = command.stderr_handler();
for line in stderr.lines() {
invoke_isolated(&mut stderr_handler, line);
}
}
fn invoke_isolated(handler: &mut Option<crate::pump::LineHandler>, line: &str) {
if let Some(h) = handler {
let invoked = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| h(line)));
if invoked.is_err() {
*handler = None;
#[cfg(feature = "tracing")]
tracing::warn!(
target: "processkit",
"line handler panicked; disabled for the rest of the run"
);
}
}
}
#[async_trait::async_trait]
impl ProcessRunner for ScriptedRunner {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
let program = command.program().to_string_lossy().into_owned();
if let Some(token) = command.cancel_token()
&& token.is_cancelled()
{
return Err(crate::error::Error::Cancelled { program });
}
if !command.stdout_is_piped() {
return Err(crate::error::Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"`{program}`: stdout is not piped (Command::stdout was set to Inherit/Null), \
so the capture verbs have nothing to read — use StdioMode::Piped to capture it"
),
)));
}
let timeout = command.configured_timeout();
let reply = self.matched_reply(command, &program)?;
if reply.pending {
return park_until_cancelled(command, program).await;
}
replay_line_handlers(command, &reply.stdout, &reply.stderr);
Ok(reply
.clone()
.into_result(program, timeout)
.with_ok_codes(command.ok_codes_vec()))
}
async fn start(&self, command: &Command) -> Result<crate::RunningProcess> {
let program = command.program().to_string_lossy().into_owned();
if let Some(token) = command.cancel_token()
&& token.is_cancelled()
{
return Err(crate::error::Error::Cancelled { program });
}
let reply = self.matched_reply(command, &program)?;
Ok(reply.clone().into_running(command))
}
}
async fn park_until_cancelled(command: &Command, program: String) -> Result<ProcessResult<String>> {
if let Some(token) = command.cancel_token() {
token.cancelled().await;
return Err(crate::error::Error::Cancelled { program });
}
std::future::pending().await
}
#[derive(Clone, PartialEq, Eq)]
pub struct Invocation {
pub program: OsString,
pub args: Vec<OsString>,
pub cwd: Option<PathBuf>,
pub envs: Vec<(OsString, Option<OsString>)>,
pub has_stdin: bool,
}
impl std::fmt::Debug for Invocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Invocation")
.field("program", &self.program)
.field("args", &self.args.len())
.field("cwd", &self.cwd)
.field("env_names", &crate::command::redacted_env_names(&self.envs))
.field("has_stdin", &self.has_stdin)
.finish()
}
}
impl Invocation {
pub(crate) fn from_command(command: &Command) -> Self {
Self {
program: command.program().to_os_string(),
args: command.arguments().to_vec(),
cwd: command.working_dir().map(std::path::Path::to_path_buf),
envs: command.env_overrides().to_vec(),
has_stdin: command
.stdin_source()
.is_some_and(|stdin| !stdin.is_empty()),
}
}
pub fn has_flag(&self, flag: impl AsRef<OsStr>) -> bool {
let flag = flag.as_ref();
self.args.iter().any(|a| a == flag)
}
pub fn args_str(&self) -> Vec<String> {
self.args
.iter()
.map(|a| a.to_string_lossy().into_owned())
.collect()
}
}
pub struct RecordingRunner<R: ProcessRunner = ScriptedRunner> {
inner: R,
calls: Mutex<Vec<Invocation>>,
}
impl<R: ProcessRunner> std::fmt::Debug for RecordingRunner<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let calls = self.calls.lock().map(|c| c.len()).unwrap_or(0);
f.debug_struct("RecordingRunner")
.field("calls", &calls)
.finish_non_exhaustive()
}
}
impl RecordingRunner<ScriptedRunner> {
pub fn replying(reply: Reply) -> Self {
Self::new(ScriptedRunner::new().fallback(reply))
}
}
impl<R: ProcessRunner> RecordingRunner<R> {
pub fn new(inner: R) -> Self {
Self {
inner,
calls: Mutex::new(Vec::new()),
}
}
pub fn calls(&self) -> Vec<Invocation> {
self.calls.lock().expect("recorder lock poisoned").clone()
}
pub fn only_call(&self) -> Invocation {
let calls = self.calls();
assert_eq!(
calls.len(),
1,
"expected exactly one call, got {}",
calls.len()
);
calls.into_iter().next().expect("length checked above")
}
}
#[async_trait::async_trait]
impl<R: ProcessRunner> ProcessRunner for RecordingRunner<R> {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
self.calls
.lock()
.expect("recorder lock poisoned")
.push(Invocation::from_command(command));
self.inner.output_string(command).await
}
async fn start(&self, command: &Command) -> Result<crate::RunningProcess> {
self.calls
.lock()
.expect("recorder lock poisoned")
.push(Invocation::from_command(command));
self.inner.start(command).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runner::ProcessRunnerExt;
#[test]
fn invocation_debug_redacts_argv_and_env_values() {
let inv = Invocation {
program: "git".into(),
args: vec!["--token=secret123".into(), "another-secret".into()],
cwd: None,
envs: vec![
("API_KEY".into(), Some("topsecret-value".into())),
("GIT_PAGER".into(), None),
],
has_stdin: false,
};
let dbg = format!("{inv:?}");
assert!(
!dbg.contains("secret123") && !dbg.contains("another-secret"),
"argv values must not appear: {dbg}"
);
assert!(
!dbg.contains("topsecret-value"),
"env values must not appear: {dbg}"
);
assert!(
dbg.contains("API_KEY") && dbg.contains("GIT_PAGER"),
"env names should appear: {dbg}"
);
assert!(dbg.contains("args: 2"), "arg count should appear: {dbg}");
}
#[tokio::test]
async fn first_line_is_reachable_through_the_scripted_seam() {
use crate::runner::ProcessRunnerExt;
let runner = ScriptedRunner::new().on(
["git", "log"],
Reply::lines(["alpha", "beta ready", "gamma"]),
);
let found = runner
.first_line(&Command::new("git").arg("log"), |l| l.contains("ready"))
.await
.expect("first_line");
assert_eq!(found.as_deref(), Some("beta ready"));
let none = runner
.first_line(&Command::new("git").arg("log"), |l| l.contains("zzz"))
.await
.expect("first_line");
assert_eq!(none, None, "no matching line yields None");
}
#[tokio::test]
async fn first_line_reports_cancellation_not_a_missing_line() {
use crate::runner::ProcessRunnerExt;
use tokio_util::sync::CancellationToken;
let runner =
ScriptedRunner::new().on(["svc", "run"], Reply::lines(["warming up", "still busy"]));
let token = CancellationToken::new();
token.cancel(); let cmd = Command::new("svc")
.arg("run")
.cancel_on(token.child_token());
let result = runner.first_line(&cmd, |l| l.contains("ready")).await;
assert!(
matches!(result, Err(crate::Error::Cancelled { .. })),
"a cancelled probe must report Cancelled, got {result:?}"
);
let cmd = Command::new("svc").arg("run");
let none = runner
.first_line(&cmd, |l| l.contains("ready"))
.await
.expect("first_line");
assert_eq!(
none, None,
"no cancellation → a missing line is still Ok(None)"
);
}
#[tokio::test]
async fn scripted_start_streams_canned_lines_through_real_pumps() {
use tokio_stream::StreamExt;
let runner =
ScriptedRunner::new().on(["git", "log"], Reply::lines(["first", "second", "third"]));
let cmd = Command::new("git").arg("log");
let mut run = runner.start(&cmd).await.expect("scripted start");
assert_eq!(run.pid(), None, "a scripted child has no OS identity");
let mut lines = run.stdout_lines().unwrap();
let mut seen = Vec::new();
while let Some(line) = lines.next().await {
seen.push(line);
}
assert_eq!(seen, ["first", "second", "third"]);
let finish = run.finish().await.expect("finish");
assert_eq!(finish.outcome, Outcome::Exited(0));
assert_eq!(finish.stderr, "");
}
#[tokio::test]
async fn scripted_start_supports_probes_and_failing_finish() {
let runner = ScriptedRunner::new().fallback(
Reply::fail(7, "boom: detail\n").with_stdout("starting up\nready to serve\n"),
);
let cmd = Command::new("server");
let mut run = runner.start(&cmd).await.expect("scripted start");
run.wait_for_line(|l| l.contains("ready"), std::time::Duration::from_secs(5))
.await
.expect("the canned banner satisfies the probe");
let finish = run.finish().await.expect("finish");
assert_eq!(finish.outcome, Outcome::Exited(7));
assert_eq!(finish.stderr, "boom: detail");
}
#[tokio::test]
async fn scripted_start_consumed_by_output_string() {
let runner = ScriptedRunner::new().fallback(Reply::lines(["a", "b"]));
let run = runner.start(&Command::new("x")).await.expect("start");
let result = run.output_string().await.expect("consume");
assert!(result.is_success());
assert_eq!(result.stdout(), "a\nb");
}
#[tokio::test(start_paused = true)]
async fn scripted_line_delay_delivers_incrementally() {
use tokio_stream::StreamExt;
let runner = ScriptedRunner::new().fallback(
Reply::lines(["tick", "tock"]).with_line_delay(std::time::Duration::from_secs(10)),
);
let mut run = runner
.start(&Command::new("clock"))
.await
.expect("scripted start");
let mut lines = run.stdout_lines().unwrap();
assert!(
tokio::time::timeout(std::time::Duration::from_secs(5), lines.next())
.await
.is_err(),
"no line may arrive before its scripted delay"
);
assert_eq!(lines.next().await.as_deref(), Some("tick"));
assert_eq!(lines.next().await.as_deref(), Some("tock"));
assert_eq!(lines.next().await, None);
}
#[tokio::test(start_paused = true)]
async fn scripted_stream_is_bounded_by_command_timeout() {
use tokio_stream::StreamExt;
let runner = ScriptedRunner::new().fallback(
Reply::lines(["tick", "tock"]).with_line_delay(std::time::Duration::from_secs(10)),
);
let cmd = Command::new("clock").timeout(std::time::Duration::from_secs(3));
let mut run = runner.start(&cmd).await.expect("scripted start");
let mut lines = run.stdout_lines().unwrap();
assert_eq!(
lines.next().await,
None,
"the scripted stream must end at the command's deadline, not run to completion"
);
let finish = run.finish().await.expect("finish");
assert_eq!(
finish.outcome,
Outcome::TimedOut,
"a stream killed by its timeout reports TimedOut, like the bulk verbs"
);
}
#[tokio::test(start_paused = true)]
async fn scripted_stream_delivers_output_produced_before_the_deadline() {
use tokio_stream::StreamExt;
let runner = ScriptedRunner::new().fallback(
Reply::lines(["one", "two", "three", "four"])
.with_line_delay(std::time::Duration::from_secs(1)),
);
let cmd = Command::new("clock").timeout(std::time::Duration::from_millis(2500));
let mut run = runner.start(&cmd).await.expect("scripted start");
let mut lines = run.stdout_lines().unwrap();
let mut seen = Vec::new();
while let Some(line) = lines.next().await {
seen.push(line);
}
assert_eq!(
seen,
["one", "two"],
"lines produced before the 2.5s deadline survive; later ones are cut off"
);
let finish = run.finish().await.expect("finish");
assert_eq!(finish.outcome, Outcome::TimedOut);
}
#[tokio::test(start_paused = true)]
async fn scripted_stream_under_a_generous_timeout_reports_natural_exit() {
use tokio_stream::StreamExt;
let runner = ScriptedRunner::new()
.fallback(Reply::lines(["a", "b"]).with_line_delay(std::time::Duration::from_secs(1)));
let cmd = Command::new("quick").timeout(std::time::Duration::from_secs(60));
let mut run = runner.start(&cmd).await.expect("scripted start");
let mut lines = run.stdout_lines().unwrap();
let mut seen = Vec::new();
while let Some(line) = lines.next().await {
seen.push(line);
}
assert_eq!(seen, ["a", "b"], "the whole short script is delivered");
let finish = run.finish().await.expect("finish");
assert_eq!(
finish.outcome,
Outcome::Exited(0),
"a run that finishes within its timeout is not spuriously TimedOut"
);
}
#[tokio::test(start_paused = true)]
async fn scripted_output_events_is_bounded_by_command_timeout() {
use tokio_stream::StreamExt;
let runner = ScriptedRunner::new().fallback(
Reply::lines(["tick", "tock"]).with_line_delay(std::time::Duration::from_secs(10)),
);
let cmd = Command::new("clock").timeout(std::time::Duration::from_secs(3));
let mut run = runner.start(&cmd).await.expect("scripted start");
let mut events = run.output_events().unwrap();
assert!(
events.next().await.is_none(),
"the merged event stream must end at the command's deadline"
);
let outcome = run.finish().await.expect("finish").outcome;
assert_eq!(outcome, Outcome::TimedOut);
}
#[tokio::test(start_paused = true)]
async fn scripted_pending_stream_finishes_at_the_deadline() {
use tokio_stream::StreamExt;
let runner = ScriptedRunner::new().fallback(Reply::pending());
let cmd = Command::new("hang").timeout(std::time::Duration::from_secs(3));
let mut run = runner.start(&cmd).await.expect("scripted start");
let mut lines = run.stdout_lines().unwrap();
assert_eq!(lines.next().await, None, "a pending reply has no output");
let finish = run.finish().await.expect("finish");
assert_eq!(finish.outcome, Outcome::TimedOut);
}
#[tokio::test(start_paused = true)]
async fn wait_for_line_does_not_arm_the_command_timeout() {
let runner = ScriptedRunner::new().fallback(
Reply::lines(["working", "working"])
.with_line_delay(std::time::Duration::from_secs(30)),
);
let cmd = Command::new("server").timeout(std::time::Duration::from_secs(3));
let mut run = runner.start(&cmd).await.expect("scripted start");
let start = tokio::time::Instant::now();
let err = run
.wait_for_line(|l| l.contains("ready"), std::time::Duration::from_secs(10))
.await
.expect_err("the line never arrives, so the probe is NotReady");
let waited = start.elapsed();
assert!(matches!(err, crate::Error::NotReady { .. }), "got {err:?}");
assert!(
waited >= std::time::Duration::from_secs(9),
"the probe must wait its full `within` (10s), not be cut short by the \
3s command timeout: waited {waited:?}"
);
}
#[tokio::test(start_paused = true)]
async fn finish_after_wait_for_line_still_times_out_at_the_command_deadline() {
let runner = ScriptedRunner::new().fallback(
Reply::lines(["working", "working"])
.with_line_delay(std::time::Duration::from_secs(30)),
);
let cmd = Command::new("hang").timeout(std::time::Duration::from_secs(3));
let mut run = runner.start(&cmd).await.expect("scripted start");
let err = run
.wait_for_line(|_| false, std::time::Duration::from_secs(1))
.await
.expect_err("nothing matches within 1s");
assert!(matches!(err, crate::Error::NotReady { .. }), "got {err:?}");
let finish = run.finish().await.expect("finish");
assert_eq!(
finish.outcome,
Outcome::TimedOut,
"the command timeout is still enforced by finish after a probe"
);
}
#[tokio::test]
async fn scripted_timeout_reply_surfaces_through_start() {
let runner = ScriptedRunner::new().fallback(Reply::timeout());
let cmd = Command::new("slow").timeout(std::time::Duration::from_secs(9));
let run = runner.start(&cmd).await.expect("start");
let result = run.output_string().await.expect("a timeout is captured");
assert!(result.timed_out());
assert!(!result.is_success());
}
#[tokio::test]
async fn output_replays_canned_lines_through_handlers() {
use std::sync::{Arc, Mutex};
let seen = Arc::new(Mutex::new(Vec::new()));
let errs = Arc::new(Mutex::new(Vec::new()));
let runner =
ScriptedRunner::new().on(["git", "fetch"], Reply::ok("a\nb\n").with_stdout("a\nb\n"));
let cmd = Command::new("git")
.arg("fetch")
.on_stdout_line({
let seen = seen.clone();
move |l| seen.lock().unwrap().push(l.to_owned())
})
.on_stderr_line({
let errs = errs.clone();
move |l| errs.lock().unwrap().push(l.to_owned())
});
let result = runner.output_string(&cmd).await.expect("scripted run");
assert!(result.is_success());
assert_eq!(*seen.lock().unwrap(), ["a", "b"]);
assert!(errs.lock().unwrap().is_empty());
}
#[tokio::test]
async fn output_isolates_a_panicking_line_handler() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let calls = Arc::new(AtomicUsize::new(0));
let runner = ScriptedRunner::new().fallback(Reply::ok("one\ntwo\nthree\n"));
let cmd = Command::new("x").on_stdout_line({
let calls = calls.clone();
move |_| {
if calls.fetch_add(1, Ordering::SeqCst) == 1 {
panic!("boom on the second line");
}
}
});
let result = runner
.output_string(&cmd)
.await
.expect("a handler panic must not fail the scripted run");
assert!(result.is_success());
assert_eq!(
calls.load(Ordering::SeqCst),
2,
"handler disabled after its panic (called for lines 1 and 2 only)"
);
}
#[tokio::test]
async fn output_on_non_piped_stdout_errors_like_the_live_path() {
let runner = ScriptedRunner::new().fallback(Reply::ok("canned"));
let cmd = Command::new("x").stdout(crate::StdioMode::Null);
let err = runner
.output_string(&cmd)
.await
.expect_err("a non-piped stdout must error on a capture verb");
match err {
crate::error::Error::Io(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput)
}
other => panic!("expected Io(InvalidInput), got {other:?}"),
}
}
#[tokio::test]
async fn output_with_an_already_cancelled_token_short_circuits() {
let token = crate::CancellationToken::new();
token.cancel();
let runner = ScriptedRunner::new().fallback(Reply::ok("must not be returned"));
let cmd = Command::new("x").cancel_on(token);
let err = runner
.output_string(&cmd)
.await
.expect_err("a pre-cancelled token short-circuits");
assert!(
matches!(err, crate::error::Error::Cancelled { .. }),
"got {err:?}"
);
}
#[tokio::test]
async fn start_with_an_already_cancelled_token_short_circuits() {
let token = crate::CancellationToken::new();
token.cancel();
let runner = ScriptedRunner::new().fallback(Reply::ok("must not start"));
let cmd = Command::new("x").cancel_on(token);
let err = runner
.start(&cmd)
.await
.expect_err("a pre-cancelled token short-circuits start");
assert!(
matches!(err, crate::error::Error::Cancelled { .. }),
"got {err:?}"
);
}
#[tokio::test(start_paused = true)]
async fn scripted_line_delay_does_not_truncate_a_longer_stderr() {
let stderr_text = (1..=10)
.map(|n| format!("e{n}"))
.collect::<Vec<_>>()
.join("\n")
+ "\n";
let reply = Reply::fail(3, stderr_text)
.with_stdout("out\n")
.with_line_delay(std::time::Duration::from_secs(1));
let runner = ScriptedRunner::new().fallback(reply);
let run = runner.start(&Command::new("x")).await.expect("start");
let result = run.output_string().await.expect("consume");
assert_eq!(
result.stderr(),
"e1\ne2\ne3\ne4\ne5\ne6\ne7\ne8\ne9\ne10",
"all 10 stderr lines survive despite only 1 stdout line"
);
}
#[tokio::test]
async fn handler_calls_happen_before_the_consuming_verb_resolves() {
use std::sync::{Arc, Mutex};
let seen = Arc::new(Mutex::new(0usize));
let lines: Vec<String> = (1..=100).map(|n| format!("line {n}")).collect();
let runner = ScriptedRunner::new().fallback(Reply::lines(lines));
let cmd = Command::new("x").on_stdout_line({
let seen = seen.clone();
move |_| *seen.lock().unwrap() += 1
});
let run = runner.start(&cmd).await.expect("scripted start");
let result = run.output_string().await.expect("consume");
assert!(result.is_success());
assert_eq!(
*seen.lock().unwrap(),
100,
"all handler calls happen-before the verb resolves"
);
}
#[tokio::test]
async fn recording_runner_records_start_invocations() {
let rec = RecordingRunner::new(ScriptedRunner::new().fallback(Reply::lines(["x"])));
let run = rec
.start(&Command::new("gh").args(["run", "watch"]))
.await
.expect("recorded start");
drop(run); assert_eq!(rec.only_call().args_str(), ["run", "watch"]);
}
#[tokio::test(start_paused = true)]
async fn scripted_pending_start_is_cancellable() {
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::pending());
let cmd = Command::new("watch").cancel_on(token.clone());
let run = runner.start(&cmd).await.expect("start");
let consume = run.output_string();
tokio::pin!(consume);
assert!(
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut consume)
.await
.is_err(),
"a pending scripted run must not resolve before cancellation"
);
token.cancel();
let err = tokio::time::timeout(std::time::Duration::from_secs(3600), consume)
.await
.expect("the token resolves the run")
.expect_err("cancellation is always an error");
assert!(
matches!(err, crate::error::Error::Cancelled { .. }),
"got {err:?}"
);
}
#[tokio::test]
async fn scripted_kill_after_natural_exit_keeps_the_cached_outcome() {
let runner = ScriptedRunner::new().fallback(Reply::fail(5, "boom"));
let mut run = runner.start(&Command::new("x")).await.expect("start");
run.start_kill().expect("kill is best-effort");
let outcome = run.wait().await.expect("wait after a post-exit kill");
assert_eq!(
outcome,
Outcome::Exited(5),
"a post-exit kill keeps the cached exit code, not Signalled"
);
}
#[tokio::test(start_paused = true)]
async fn wait_any_on_a_pending_scripted_handle_is_cancellable() {
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::pending());
let cmd = Command::new("watch").cancel_on(token.clone());
let mut run = runner.start(&cmd).await.expect("start");
let mut handles = [&mut run];
let race = crate::wait_any(&mut handles);
tokio::pin!(race);
assert!(
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut race)
.await
.is_err(),
"a pending scripted run must not resolve before cancellation"
);
token.cancel();
let err = tokio::time::timeout(std::time::Duration::from_secs(3600), race)
.await
.expect("the token resolves the race")
.expect_err("a cancelled run surfaces as an error");
assert!(
matches!(err, crate::error::Error::Cancelled { .. }),
"got {err:?}"
);
}
#[tokio::test]
async fn prefix_rule_matches_and_replies() {
let runner = ScriptedRunner::new().on(["git", "status"], Reply::ok("clean"));
let out = runner
.output_string(&Command::new("git").arg("status"))
.await
.unwrap();
assert_eq!(out.stdout(), "clean");
assert!(out.is_success());
}
#[tokio::test]
async fn predicate_rule_and_fallback() {
let runner = ScriptedRunner::new()
.when(
|c| c.arguments().iter().any(|a| a == "--version"),
Reply::ok("v1"),
)
.fallback(Reply::fail(1, "unknown"));
assert_eq!(
runner
.output_string(&Command::new("tool").arg("--version"))
.await
.unwrap()
.stdout(),
"v1"
);
let miss = runner
.output_string(&Command::new("tool").arg("x"))
.await
.unwrap();
assert_eq!(miss.code(), Some(1));
assert!(!miss.is_success());
}
#[tokio::test]
async fn no_match_without_fallback_is_a_not_found_spawn_error() {
let runner = ScriptedRunner::new().on(["git", "status"], Reply::ok("clean"));
let err = runner
.output_string(&Command::new("git").arg("log"))
.await
.expect_err("an unmatched command with no fallback must error");
match err {
crate::error::Error::Spawn { program, source } => {
assert_eq!(program, "git");
assert_eq!(source.kind(), std::io::ErrorKind::NotFound);
}
other => panic!("expected Error::Spawn, got {other:?}"),
}
}
#[tokio::test]
async fn prefix_matches_whole_elements_not_substrings() {
let runner = ScriptedRunner::new().on(["tool", "foo"], Reply::ok("hit"));
assert!(
runner
.output_string(&Command::new("tool").args(["foo", "bar"]))
.await
.is_ok()
);
assert!(
runner
.output_string(&Command::new("tool").arg("foobar"))
.await
.is_err(),
"substring of an element is not a prefix match"
);
}
#[tokio::test]
async fn on_matches_the_program_not_just_the_args() {
let runner = ScriptedRunner::new()
.on(["git", "status"], Reply::ok("on branch main"))
.fallback(Reply::fail(1, "unmatched"));
let hit = runner
.output_string(&Command::new("git").arg("status"))
.await
.unwrap();
assert_eq!(hit.stdout(), "on branch main");
let miss = runner
.output_string(&Command::new("rm").arg("status"))
.await
.unwrap();
assert_eq!(
miss.code(),
Some(1),
"a different program with the same args must NOT match the rule"
);
}
#[tokio::test]
async fn on_sequence_yields_each_reply_then_repeats_the_last() {
let runner = ScriptedRunner::new().on_sequence(
["git", "push"],
[Reply::fail(1, "rejected"), Reply::ok("pushed")],
);
assert_eq!(
runner
.output_string(&Command::new("git").arg("push"))
.await
.unwrap()
.code(),
Some(1),
"1st call: the first reply (fail)"
);
for nth in ["2nd", "3rd"] {
assert_eq!(
runner
.output_string(&Command::new("git").arg("push"))
.await
.unwrap()
.stdout(),
"pushed",
"{nth} call: the last reply repeats"
);
}
}
#[tokio::test]
async fn timeout_reply_surfaces_as_timeout_error() {
use crate::error::Error;
let runner = ScriptedRunner::new().fallback(Reply::timeout());
let out = runner.output_string(&Command::new("git")).await.unwrap();
assert!(out.timed_out());
assert!(matches!(
runner.run(&Command::new("git")).await.unwrap_err(),
Error::Timeout { .. }
));
assert!(matches!(
runner.exit_code(&Command::new("git")).await.unwrap_err(),
Error::Timeout { .. }
));
let cmd = Command::new("git").timeout(std::time::Duration::from_secs(7));
match runner.run(&cmd).await.unwrap_err() {
Error::Timeout { timeout, .. } => {
assert_eq!(timeout, std::time::Duration::from_secs(7))
}
other => panic!("expected Timeout, got {other:?}"),
}
}
#[tokio::test]
async fn scripted_output_bytes_serves_canned_stdout_through_the_seam() {
let runner = ScriptedRunner::new().fallback(Reply::ok("raw\u{0}bytes"));
let result = runner
.output_bytes(&Command::new("git").args(["cat-file", "blob", "HEAD"]))
.await
.expect("scripted output_bytes");
assert_eq!(result.stdout(), b"raw\x00bytes");
assert!(result.is_success());
}
#[tokio::test]
async fn signalled_reply_carries_signal_number() {
use crate::error::Error;
let runner = ScriptedRunner::new().fallback(Reply::signalled(Some(9)));
let result = runner.output_string(&Command::new("tool")).await.unwrap();
assert_eq!(result.outcome(), crate::Outcome::Signalled(Some(9)));
assert!(matches!(
runner.run(&Command::new("tool")).await.unwrap_err(),
Error::Signalled {
signal: Some(9),
..
}
));
}
#[tokio::test]
async fn signalled_reply_without_a_number_is_signalled_none() {
use crate::error::Error;
let runner = ScriptedRunner::new().fallback(Reply::signalled(None));
let result = runner.output_string(&Command::new("tool")).await.unwrap();
assert_eq!(result.outcome(), crate::Outcome::Signalled(None));
assert!(matches!(
runner.run(&Command::new("tool")).await.unwrap_err(),
Error::Signalled { signal: None, .. }
));
}
#[tokio::test(start_paused = true)]
async fn pending_parks_until_the_token_fires_then_cancels() {
use crate::error::Error;
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().on(["gh", "run", "watch"], Reply::pending());
let cmd = Command::new("gh")
.args(["run", "watch"])
.cancel_on(token.clone());
let call = runner.output_string(&cmd);
tokio::pin!(call);
assert!(
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut call)
.await
.is_err(),
"a pending reply must not resolve before cancellation"
);
token.cancel();
match call.await {
Err(Error::Cancelled { program }) => assert_eq!(program, "gh"),
other => panic!("expected Error::Cancelled, got {other:?}"),
}
}
#[tokio::test(start_paused = true)]
async fn pending_without_a_token_parks_forever() {
let runner = ScriptedRunner::new().fallback(Reply::pending());
let cmd = Command::new("gh");
let call = runner.output_string(&cmd);
tokio::pin!(call);
assert!(
tokio::time::timeout(std::time::Duration::from_secs(3600), &mut call)
.await
.is_err()
);
}
#[tokio::test]
async fn probe_reads_exit_code_as_bool() {
use crate::error::Error;
let runner = ScriptedRunner::new()
.on(["t", "yes"], Reply::ok(""))
.on(["t", "no"], Reply::fail(1, ""))
.on(["t", "boom"], Reply::fail(2, "bad"))
.fallback(Reply::timeout());
assert!(runner.probe(&Command::new("t").arg("yes")).await.unwrap());
assert!(!runner.probe(&Command::new("t").arg("no")).await.unwrap());
assert!(matches!(
runner
.probe(&Command::new("t").arg("boom"))
.await
.unwrap_err(),
Error::Exit { code: 2, .. }
));
assert!(matches!(
runner
.probe(&Command::new("t").arg("other"))
.await
.unwrap_err(),
Error::Timeout { .. }
));
}
#[tokio::test]
async fn run_ext_trims_and_checks_success() {
let runner = ScriptedRunner::new().fallback(Reply::ok(" hello \n"));
let trimmed = runner.run(&Command::new("echo")).await.unwrap();
assert_eq!(trimmed, " hello");
}
#[tokio::test]
async fn recording_captures_args_cwd_and_absence() {
let recorder = RecordingRunner::replying(Reply::ok("ok"));
let _ = recorder
.output_string(
&Command::new("gh")
.current_dir("/repo")
.args(["pr", "create", "--title", "T"]),
)
.await
.unwrap();
let call = recorder.only_call();
assert_eq!(call.program, OsString::from("gh"));
assert_eq!(call.cwd, Some(PathBuf::from("/repo")));
assert!(call.has_flag("--title"));
assert!(!call.has_flag("--base"), "no --base flag was passed");
}
}