use crate::error::StreamError;
use serde::Serialize;
use std::io::{BufRead, BufReader, Read};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex, OnceLock};
use std::thread;
use std::time::Duration;
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind", rename_all = "camelCase")]
#[non_exhaustive]
pub enum ProcessEvent {
Started { run_id: String },
Stdout { run_id: String, line: String },
Stderr { run_id: String, line: String },
Error { run_id: String, message: String },
Exited {
run_id: String,
exit_code: Option<i32>,
cancelled: bool,
},
}
#[derive(Clone, Debug)]
pub struct ProcessHandle {
inner: Arc<HandleInner>,
}
#[derive(Debug)]
struct HandleInner {
child: Mutex<Option<Child>>,
cancelled: AtomicBool,
}
impl ProcessHandle {
pub fn cancel(&self) -> Result<(), StreamError> {
self.inner.cancelled.store(true, Ordering::SeqCst);
let mut guard = self
.inner
.child
.lock()
.map_err(|_| StreamError::CancelLockPoisoned)?;
let Some(child) = guard.as_mut() else {
return Ok(());
};
#[cfg(unix)]
{
let pid = child.id() as i32;
unsafe { libc::kill(pid, libc::SIGTERM) };
let inner = Arc::clone(&self.inner);
thread::spawn(move || {
thread::sleep(Duration::from_millis(1500));
if let Ok(mut guard) = inner.child.lock() {
if let Some(child) = guard.as_mut() {
let _ = child.kill();
}
}
});
}
#[cfg(not(unix))]
{
let _ = child.kill();
}
Ok(())
}
pub fn was_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::SeqCst)
}
}
pub fn spawn_streaming<F>(
program: PathBuf,
args: Vec<String>,
env: Vec<(String, String)>,
cwd: PathBuf,
run_id: String,
callback: F,
) -> Result<ProcessHandle, StreamError>
where
F: FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
{
let augmented_path = augment_path_for_node(&program);
let mut command = Command::new(&program);
command
.args(&args)
.current_dir(&cwd)
.env("PATH", augmented_path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, value) in &env {
command.env(key, value);
}
let mut child = command.spawn().map_err(|source| StreamError::Spawn {
program: program.display().to_string(),
source,
})?;
let stdout = child
.stdout
.take()
.ok_or(StreamError::PipeNotCaptured { stream: "stdout" })?;
let stderr = child
.stderr
.take()
.ok_or(StreamError::PipeNotCaptured { stream: "stderr" })?;
let inner = Arc::new(HandleInner {
child: Mutex::new(Some(child)),
cancelled: AtomicBool::new(false),
});
let handle = ProcessHandle { inner: Arc::clone(&inner) };
let mut started_cb = callback.clone();
started_cb(ProcessEvent::Started { run_id: run_id.clone() });
let stdout_cb = callback.clone();
let stdout_run_id = run_id.clone();
let stdout_handle = thread::spawn(move || {
pump_lines(stdout, stdout_run_id, true, stdout_cb);
});
let stderr_cb = callback.clone();
let stderr_run_id = run_id.clone();
let stderr_handle = thread::spawn(move || {
pump_lines(stderr, stderr_run_id, false, stderr_cb);
});
let exit_inner = Arc::clone(&inner);
let mut exit_cb = callback;
let exit_run_id = run_id;
thread::spawn(move || {
let wait_result = loop {
{
let mut guard = match exit_inner.child.lock() {
Ok(guard) => guard,
Err(_) => return, };
match guard.as_mut() {
Some(child) => match child.try_wait() {
Ok(Some(status)) => break Ok(status),
Ok(None) => {} Err(err) => break Err(err),
},
None => return, }
} thread::sleep(Duration::from_millis(50));
};
let _ = stdout_handle.join();
let _ = stderr_handle.join();
let cancelled = exit_inner.cancelled.load(Ordering::SeqCst);
match wait_result {
Ok(status) => exit_cb(ProcessEvent::Exited {
run_id: exit_run_id.clone(),
exit_code: status.code(),
cancelled,
}),
Err(err) => exit_cb(ProcessEvent::Error {
run_id: exit_run_id.clone(),
message: format!("wait failed: {err}"),
}),
}
if let Ok(mut guard) = exit_inner.child.lock() {
*guard = None;
}
});
Ok(handle)
}
fn pump_lines<R, F>(reader: R, run_id: String, is_stdout: bool, mut callback: F)
where
R: Read,
F: FnMut(ProcessEvent),
{
let buffered = BufReader::new(reader);
for line in buffered.lines() {
match line {
Ok(text) => {
let event = if is_stdout {
ProcessEvent::Stdout { run_id: run_id.clone(), line: text }
} else {
ProcessEvent::Stderr { run_id: run_id.clone(), line: text }
};
callback(event);
}
Err(err) => {
callback(ProcessEvent::Error {
run_id: run_id.clone(),
message: format!("stream read failed: {err}"),
});
return;
}
}
}
}
fn augment_path_for_node(program: &Path) -> String {
prepend_program_dir(program, &augmented_node_path())
}
fn prepend_program_dir(program: &Path, base_path: &str) -> String {
match program
.parent()
.map(|p| p.display().to_string())
.filter(|s| !s.is_empty())
{
Some(dir) => format!("{dir}:{base_path}"),
None => base_path.to_owned(),
}
}
pub fn augmented_node_path() -> String {
static CACHED: OnceLock<String> = OnceLock::new();
CACHED.get_or_init(compute_augmented_node_path).clone()
}
fn compute_augmented_node_path() -> String {
let mut parts: Vec<String> = Vec::new();
if let Ok(existing) = std::env::var("PATH") {
if !existing.is_empty() {
parts.push(existing);
}
}
parts.push(login_shell_path().unwrap_or_else(hardcoded_node_dirs));
keep_absolute_entries(&parts.join(":"))
}
fn keep_absolute_entries(path: &str) -> String {
path.split(':')
.filter(|entry| entry.starts_with('/'))
.collect::<Vec<_>>()
.join(":")
}
const PATH_SENTINEL: &str = "__CLI_STREAM_PATH__";
#[cfg(unix)]
fn login_shell_path() -> Option<String> {
let shell = std::env::var("SHELL").ok().filter(|s| !s.is_empty())?;
let script = format!("printf '\\n{PATH_SENTINEL}\\n'; env");
let mut child = Command::new(&shell)
.arg("-lic") .arg(&script)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.ok()?;
let mut stdout = child.stdout.take()?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut buf = Vec::new();
let _ = stdout.read_to_end(&mut buf);
let _ = tx.send(String::from_utf8_lossy(&buf).into_owned());
});
let output = match rx.recv_timeout(Duration::from_secs(4)) {
Ok(buf) => buf,
Err(_) => {
let _ = child.kill();
let _ = child.wait();
return None;
}
};
let _ = child.wait();
parse_path_from_shell_output(&output)
}
#[cfg(not(unix))]
fn login_shell_path() -> Option<String> {
None
}
fn parse_path_from_shell_output(output: &str) -> Option<String> {
output
.rsplit_once(PATH_SENTINEL)?
.1
.lines()
.find_map(|line| line.strip_prefix("PATH="))
.map(str::trim)
.filter(|p| !p.is_empty())
.map(str::to_owned)
}
fn hardcoded_node_dirs() -> String {
let mut parts: Vec<String> =
vec!["/usr/local/bin:/opt/homebrew/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_owned()];
if let Ok(home) = std::env::var("HOME") {
if !home.is_empty() {
let home_path = Path::new(&home);
parts.push(home_path.join(".local/bin").display().to_string());
if let Ok(entries) = std::fs::read_dir(home_path.join(".nvm/versions/node")) {
for entry in entries.flatten() {
let bin = entry.path().join("bin");
if bin.is_dir() {
parts.push(bin.display().to_string());
}
}
}
}
}
parts.join(":")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hardcoded_fallback_includes_macos_defaults() {
let path = hardcoded_node_dirs();
assert!(path.contains("/opt/homebrew/bin"), "missing Apple-Silicon Homebrew bin");
assert!(path.contains("/usr/local/bin"), "missing Intel Homebrew / system bin");
assert!(path.contains("/usr/bin"), "missing system bin");
}
#[test]
fn parse_path_from_shell_output_skips_chatter_before_the_sentinel() {
let output = "\u{1b}]1337;RemoteHost=x\u{7}welcome banner\nPATH=/decoy\n__CLI_STREAM_PATH__\nHOME=/Users/x\nPATH=/opt/homebrew/bin:/usr/bin\nLANG=en_US";
assert_eq!(
parse_path_from_shell_output(output).as_deref(),
Some("/opt/homebrew/bin:/usr/bin")
);
assert_eq!(parse_path_from_shell_output("PATH=/usr/bin"), None);
assert_eq!(parse_path_from_shell_output("__CLI_STREAM_PATH__\nFOO=bar"), None);
assert_eq!(parse_path_from_shell_output("__CLI_STREAM_PATH__\nPATH=\nFOO=bar"), None);
}
#[test]
fn keep_absolute_entries_drops_relative_and_empty() {
assert_eq!(
keep_absolute_entries("/opt/homebrew/bin:node_modules/.bin:/usr/bin:.::/bin"),
"/opt/homebrew/bin:/usr/bin:/bin"
);
assert_eq!(keep_absolute_entries("/usr/bin"), "/usr/bin");
assert_eq!(keep_absolute_entries(".:rel:"), "");
}
#[test]
fn prepend_program_dir_puts_the_binary_dir_first() {
let combined = prepend_program_dir(
Path::new("/Users/x/.nvm/versions/node/v22/bin/bob"),
"/opt/homebrew/bin:/usr/bin",
);
assert!(combined.starts_with("/Users/x/.nvm/versions/node/v22/bin:"));
assert!(combined.contains("/opt/homebrew/bin"));
assert_eq!(prepend_program_dir(Path::new("bob"), "/usr/bin"), "/usr/bin");
}
#[test]
fn augmented_node_path_is_nonempty_and_resolves_system_bin() {
let path = augmented_node_path();
assert!(!path.is_empty());
assert!(path.contains("/usr/bin"), "system bin must always resolve");
}
}
#[cfg(all(test, unix))]
mod lifecycle {
use super::*;
use std::sync::Condvar;
use std::time::Instant;
type Done = Arc<(Mutex<bool>, Condvar)>;
fn collector() -> (
impl FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
Arc<Mutex<Vec<ProcessEvent>>>,
Done,
) {
let events = Arc::new(Mutex::new(Vec::new()));
let done: Done = Arc::new((Mutex::new(false), Condvar::new()));
let cb = {
let events = Arc::clone(&events);
let done = Arc::clone(&done);
move |ev: ProcessEvent| {
let terminal =
matches!(ev, ProcessEvent::Exited { .. } | ProcessEvent::Error { .. });
events.lock().unwrap().push(ev);
if terminal {
let (lock, cvar) = &*done;
*lock.lock().unwrap() = true;
cvar.notify_all();
}
}
};
(cb, events, done)
}
fn wait_done(done: &Done, secs: u64) {
let (lock, cvar) = &**done;
let mut finished = lock.lock().unwrap();
let deadline = Instant::now() + Duration::from_secs(secs);
while !*finished {
let now = Instant::now();
assert!(now < deadline, "process did not finish within {secs}s");
let (guard, _) = cvar.wait_timeout(finished, deadline - now).unwrap();
finished = guard;
}
}
fn run(program: &str, args: &[&str]) -> Vec<ProcessEvent> {
let (cb, events, done) = collector();
let _handle = spawn_streaming(
PathBuf::from(program),
args.iter().map(|s| (*s).to_owned()).collect(),
Vec::new(),
PathBuf::from("."),
"t".to_owned(),
cb,
)
.expect("spawn");
wait_done(&done, 10);
let events = events.lock().unwrap();
events.clone()
}
#[test]
fn streams_stdout_lines_then_exits_zero() {
let events = run("printf", &["%s\n", "alpha", "beta"]);
assert!(matches!(events.first(), Some(ProcessEvent::Started { .. })));
assert!(matches!(
events.last(),
Some(ProcessEvent::Exited { exit_code: Some(0), cancelled: false, .. })
));
let lines: Vec<&str> = events
.iter()
.filter_map(|e| match e {
ProcessEvent::Stdout { line, .. } => Some(line.as_str()),
_ => None,
})
.collect();
assert_eq!(lines, vec!["alpha", "beta"]);
}
#[test]
fn nonzero_exit_code_is_reported() {
let events = run("sh", &["-c", "exit 3"]);
assert!(matches!(
events.last(),
Some(ProcessEvent::Exited { exit_code: Some(3), cancelled: false, .. })
));
}
#[test]
fn env_vars_are_passed_to_the_child() {
let (cb, events, done) = collector();
let _handle = spawn_streaming(
PathBuf::from("sh"),
vec!["-c".to_owned(), "printf '%s\\n' \"$CLI_STREAM_STUB\"".to_owned()],
vec![("CLI_STREAM_STUB".to_owned(), "from-env".to_owned())],
PathBuf::from("."),
"t".to_owned(),
cb,
)
.expect("spawn");
wait_done(&done, 10);
let events = events.lock().unwrap();
assert!(
events
.iter()
.any(|e| matches!(e, ProcessEvent::Stdout { line, .. } if line == "from-env")),
"child should observe the injected env var, got {events:?}"
);
}
#[test]
fn stderr_is_streamed_and_not_misrouted_to_stdout() {
let events = run("sh", &["-c", "echo to-stderr 1>&2"]);
assert!(events
.iter()
.any(|e| matches!(e, ProcessEvent::Stderr { line, .. } if line == "to-stderr")));
assert!(!events.iter().any(|e| matches!(e, ProcessEvent::Stdout { .. })));
assert!(events
.iter()
.any(|e| matches!(e, ProcessEvent::Exited { exit_code: Some(0), .. })));
}
#[test]
fn cancel_promptly_terminates_the_run_and_flags_it() {
let (cb, events, done) = collector();
let handle = spawn_streaming(
PathBuf::from("sh"),
vec!["-c".to_owned(), "exec sleep 10".to_owned()],
Vec::new(),
PathBuf::from("."),
"t".to_owned(),
cb,
)
.expect("spawn");
let canceller = handle.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let _ = canceller.cancel();
});
wait_done(&done, 4);
assert!(handle.was_cancelled());
let events = events.lock().unwrap();
assert!(
matches!(events.last(), Some(ProcessEvent::Exited { cancelled: true, .. })),
"expected Exited(cancelled=true), got {:?}",
events.last()
);
}
#[test]
fn spawning_a_missing_binary_is_err() {
let result = spawn_streaming(
PathBuf::from("cli-stream-no-such-binary-zzz"),
Vec::new(),
Vec::new(),
PathBuf::from("."),
"t".to_owned(),
|_ev: ProcessEvent| {},
);
match result {
Err(StreamError::Spawn { program, source }) => {
assert!(program.contains("cli-stream-no-such-binary-zzz"));
assert_eq!(source.kind(), std::io::ErrorKind::NotFound);
}
other => panic!("expected StreamError::Spawn, got {other:?}"),
}
}
}