cli_stream/process.rs
1//! Generic streaming subprocess engine — the shared core behind every
2//! process-backed harness (bob, Claude Code, Codex, …).
3//!
4//! Spawns a child, pipes stdout/stderr line-by-line through a callback
5//! as [`ProcessEvent`]s, augments PATH so Node-based CLIs resolve even
6//! from a Finder-launched `.app`, and hands back a [`ProcessHandle`] for
7//! cancellation (SIGTERM → SIGKILL). No harness-trait or bob knowledge —
8//! purely subprocess streaming.
9//!
10//! Cancellation is the wrinkle: a run needs to be stoppable mid-stream
11//! when the user closes the tab or hits "stop". `ProcessHandle::cancel()`
12//! sends SIGTERM (with a SIGKILL fallback) and flips an atomic
13//! `cancelled` flag the reader threads use to short-circuit.
14
15use crate::error::StreamError;
16use serde::Serialize;
17use std::io::{BufRead, BufReader, Read};
18use std::path::{Path, PathBuf};
19use std::process::{Child, Command, Stdio};
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{mpsc, Arc, Mutex, OnceLock};
22use std::thread;
23use std::time::Duration;
24
25/// Raw events emitted to the caller's callback during a streaming run.
26/// JSON-tagged so axum SSE and Tauri Channel render identical payloads
27/// on the wire. Harness-neutral: a process-backed adapter parses the
28/// `Stdout` lines into a normalized event vocabulary (e.g. `agent-harness`'s `RunEvent`).
29#[derive(Debug, Clone, Serialize)]
30#[serde(tag = "kind", rename_all = "camelCase")]
31// New lifecycle events can be added without breaking downstream matches —
32// consumers must carry a `_` arm. (Construction of existing variants is
33// unaffected, so it's still ergonomic to build them.)
34#[non_exhaustive]
35pub enum ProcessEvent {
36 /// First event. Sent before the child has produced any output so the
37 /// UI can show a "thinking…" state.
38 Started { run_id: String },
39 /// Raw stdout line. Process-backed CLIs emit one JSON object per line
40 /// in their streaming mode. The caller parses.
41 Stdout { run_id: String, line: String },
42 /// Raw stderr line. Warnings + the occasional error.
43 Stderr { run_id: String, line: String },
44 /// Spawn / IO failure. Terminal — followed by `Exited`.
45 Error { run_id: String, message: String },
46 /// Process exited. Always sent exactly once at the end.
47 Exited {
48 run_id: String,
49 exit_code: Option<i32>,
50 /// True iff `cancel()` was called before exit.
51 cancelled: bool,
52 },
53}
54
55/// Handle to an in-flight streaming run. Caller stores it (e.g. in a
56/// runId-keyed map) so a later `cancel()` can find it.
57///
58/// Dropping the handle does NOT cancel the run — the reader threads +
59/// wait thread continue independently. Use `cancel()` explicitly when
60/// the user closes the connection.
61#[derive(Clone, Debug)]
62pub struct ProcessHandle {
63 inner: Arc<HandleInner>,
64}
65
66#[derive(Debug)]
67struct HandleInner {
68 child: Mutex<Option<Child>>,
69 cancelled: AtomicBool,
70}
71
72impl ProcessHandle {
73 /// SIGTERM the process, then SIGKILL after 1.5s if it's still alive.
74 /// The CLI is supposed to flush a final result on SIGTERM but we
75 /// don't trust it to do so forever.
76 pub fn cancel(&self) -> Result<(), StreamError> {
77 self.inner.cancelled.store(true, Ordering::SeqCst);
78 let mut guard = self
79 .inner
80 .child
81 .lock()
82 .map_err(|_| StreamError::CancelLockPoisoned)?;
83 let Some(child) = guard.as_mut() else {
84 // Already exited.
85 return Ok(());
86 };
87 // Best-effort SIGTERM. On Unix, kill() sends SIGKILL by default;
88 // we use libc::kill for SIGTERM, falling back to child.kill() if
89 // the libc call fails. On Windows there's only TerminateProcess
90 // via .kill().
91 #[cfg(unix)]
92 {
93 let pid = child.id() as i32;
94 // SAFETY: pid is the child's PID owned by this Child; sending
95 // SIGTERM is well-defined.
96 unsafe { libc::kill(pid, libc::SIGTERM) };
97 // Spawn the SIGKILL fallback inline to avoid holding the mutex
98 // while sleeping.
99 let inner = Arc::clone(&self.inner);
100 thread::spawn(move || {
101 thread::sleep(Duration::from_millis(1500));
102 if let Ok(mut guard) = inner.child.lock() {
103 if let Some(child) = guard.as_mut() {
104 let _ = child.kill();
105 }
106 }
107 });
108 }
109 #[cfg(not(unix))]
110 {
111 let _ = child.kill();
112 }
113 Ok(())
114 }
115
116 /// Whether `cancel()` was called. Tagged on the final `Exited` event.
117 pub fn was_cancelled(&self) -> bool {
118 self.inner.cancelled.load(Ordering::SeqCst)
119 }
120}
121
122/// Spawn an arbitrary streaming child process — the generic engine behind
123/// every process-backed harness (bob, Claude Code, Codex).
124///
125/// Pipes stdout/stderr line-by-line through `callback` using the raw
126/// [`ProcessEvent`] vocabulary (Started / Stdout / Stderr / Error /
127/// Exited). `env` supplies per-harness secrets (each harness's API-key
128/// var, or none for self-authenticating CLIs). PATH is augmented so
129/// Node-based CLIs find `node`. Returns a [`ProcessHandle`] for
130/// cancellation.
131///
132/// `callback` is invoked from three threads (stdout reader, stderr
133/// reader, exit watcher); the `Clone` bound lets us hand a copy to each.
134/// `run_id` is opaque — the caller chooses it and uses it to correlate
135/// events with the handle.
136///
137/// ```no_run
138/// use cli_stream::{spawn_streaming, ProcessEvent};
139/// use std::path::PathBuf;
140///
141/// # fn main() -> Result<(), cli_stream::StreamError> {
142/// let handle = spawn_streaming(
143/// PathBuf::from("echo"),
144/// vec!["hello".to_owned()],
145/// Vec::new(), // extra env vars (key, value)
146/// std::env::current_dir().unwrap(),
147/// "run-1".to_owned(), // your correlation id
148/// |event| match event {
149/// ProcessEvent::Stdout { line, .. } => println!("{line}"),
150/// ProcessEvent::Exited { exit_code, .. } => eprintln!("exit {exit_code:?}"),
151/// _ => {}
152/// },
153/// )?;
154/// // `handle.cancel()` stops it early; dropping the handle does not.
155/// let _ = handle;
156/// # Ok(())
157/// # }
158/// ```
159pub fn spawn_streaming<F>(
160 program: PathBuf,
161 args: Vec<String>,
162 env: Vec<(String, String)>,
163 cwd: PathBuf,
164 run_id: String,
165 callback: F,
166) -> Result<ProcessHandle, StreamError>
167where
168 F: FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
169{
170 // PATH augmentation: Node-based CLIs (bob, claude, codex) expect
171 // `node` (and often `npm`, `git`) on PATH. A desktop app launched
172 // from Finder/Launchpad inherits only the minimal launchd PATH
173 // (`/usr/bin:/bin:/usr/sbin:/sbin`), so an nvm-installed node is
174 // invisible and the child exits 127 ("command not found").
175 //
176 // Fix: prepend the program's parent dir (where node also lives in an
177 // nvm install) to the child's PATH. Added, not replaced, so a PATH
178 // the user explicitly set still wins on later lookups.
179 let augmented_path = augment_path_for_node(&program);
180
181 let mut command = Command::new(&program);
182 command
183 .args(&args)
184 .current_dir(&cwd)
185 .env("PATH", augmented_path)
186 .stdin(Stdio::null())
187 .stdout(Stdio::piped())
188 .stderr(Stdio::piped());
189 for (key, value) in &env {
190 command.env(key, value);
191 }
192 let mut child = command.spawn().map_err(|source| StreamError::Spawn {
193 program: program.display().to_string(),
194 source,
195 })?;
196
197 let stdout = child
198 .stdout
199 .take()
200 .ok_or(StreamError::PipeNotCaptured { stream: "stdout" })?;
201 let stderr = child
202 .stderr
203 .take()
204 .ok_or(StreamError::PipeNotCaptured { stream: "stderr" })?;
205
206 let inner = Arc::new(HandleInner {
207 child: Mutex::new(Some(child)),
208 cancelled: AtomicBool::new(false),
209 });
210 let handle = ProcessHandle { inner: Arc::clone(&inner) };
211
212 // Emit Started immediately so the caller doesn't wait on the first
213 // output line for a UI signal.
214 let mut started_cb = callback.clone();
215 started_cb(ProcessEvent::Started { run_id: run_id.clone() });
216
217 // Reader threads. Each owns its own callback clone — the Clone bound
218 // is the whole point.
219 let stdout_cb = callback.clone();
220 let stdout_run_id = run_id.clone();
221 let stdout_handle = thread::spawn(move || {
222 pump_lines(stdout, stdout_run_id, true, stdout_cb);
223 });
224
225 let stderr_cb = callback.clone();
226 let stderr_run_id = run_id.clone();
227 let stderr_handle = thread::spawn(move || {
228 pump_lines(stderr, stderr_run_id, false, stderr_cb);
229 });
230
231 // Exit watcher — emits the terminal Exited event with the cancellation
232 // flag. It must NOT hold the child lock across a blocking `wait()`:
233 // `cancel()` needs that same lock to signal the child, so a held lock
234 // would block cancel until the process exited on its own (defeating it).
235 // Instead poll `try_wait()`, locking only for each non-blocking check and
236 // releasing between polls so `cancel()` can acquire the lock mid-run.
237 let exit_inner = Arc::clone(&inner);
238 let mut exit_cb = callback;
239 let exit_run_id = run_id;
240 thread::spawn(move || {
241 let wait_result = loop {
242 {
243 let mut guard = match exit_inner.child.lock() {
244 Ok(guard) => guard,
245 Err(_) => return, // poisoned — nothing safe to do
246 };
247 match guard.as_mut() {
248 Some(child) => match child.try_wait() {
249 Ok(Some(status)) => break Ok(status),
250 Ok(None) => {} // still running; poll again
251 Err(err) => break Err(err),
252 },
253 None => return, // already reaped
254 }
255 } // lock released before sleeping, so cancel() can acquire it
256 thread::sleep(Duration::from_millis(50));
257 };
258 let _ = stdout_handle.join();
259 let _ = stderr_handle.join();
260 let cancelled = exit_inner.cancelled.load(Ordering::SeqCst);
261
262 match wait_result {
263 Ok(status) => exit_cb(ProcessEvent::Exited {
264 run_id: exit_run_id.clone(),
265 exit_code: status.code(),
266 cancelled,
267 }),
268 Err(err) => exit_cb(ProcessEvent::Error {
269 run_id: exit_run_id.clone(),
270 message: format!("wait failed: {err}"),
271 }),
272 }
273
274 // Drop the child handle so subsequent cancel() calls
275 // short-circuit cleanly.
276 if let Ok(mut guard) = exit_inner.child.lock() {
277 *guard = None;
278 }
279 });
280
281 Ok(handle)
282}
283
284fn pump_lines<R, F>(reader: R, run_id: String, is_stdout: bool, mut callback: F)
285where
286 R: Read,
287 F: FnMut(ProcessEvent),
288{
289 let buffered = BufReader::new(reader);
290 for line in buffered.lines() {
291 match line {
292 Ok(text) => {
293 let event = if is_stdout {
294 ProcessEvent::Stdout { run_id: run_id.clone(), line: text }
295 } else {
296 ProcessEvent::Stderr { run_id: run_id.clone(), line: text }
297 };
298 callback(event);
299 }
300 Err(err) => {
301 callback(ProcessEvent::Error {
302 run_id: run_id.clone(),
303 message: format!("stream read failed: {err}"),
304 });
305 return;
306 }
307 }
308 }
309}
310
311/// Compose a PATH for the spawned process that always includes the
312/// directory containing the program — where `node`, `npm`, and friends
313/// usually live in an nvm install. The user's existing PATH stays as a
314/// fallback after our prepended directory.
315fn augment_path_for_node(program: &Path) -> String {
316 prepend_program_dir(program, &augmented_node_path())
317}
318
319/// Prepend the directory containing `program` (where `node` also lives in an
320/// nvm install) to `base_path`, so the resolved binary's own dir is searched
321/// first. Pure (no env / no spawn) so it's unit-tested directly.
322fn prepend_program_dir(program: &Path, base_path: &str) -> String {
323 match program
324 .parent()
325 .map(|p| p.display().to_string())
326 .filter(|s| !s.is_empty())
327 {
328 Some(dir) => format!("{dir}:{base_path}"),
329 None => base_path.to_owned(),
330 }
331}
332
333/// A PATH that resolves Node-based CLIs (bob, claude, codex) even from a
334/// process launched by Finder/Launchpad, which inherits only the minimal
335/// launchd PATH (`/usr/bin:/bin:/usr/sbin:/sbin`) rather than the user's
336/// shell PATH.
337///
338/// Strategy: keep the process's own PATH first (an explicit PATH still wins),
339/// then append the user's **real** PATH as resolved by their login shell —
340/// which sources their rc, so it knows where nvm / pnpm / volta / asdf / fnm /
341/// Homebrew put `node`, with no guessing. If the shell query is unavailable
342/// (no `$SHELL`, a timeout, a sandboxed app that can't spawn, …) we fall back
343/// to a hardcoded best-effort list, so we're never worse than before.
344///
345/// Used by the run path (which prepends the resolved binary's own dir on top
346/// of this) and by readiness probes that locate `claude`/`codex` via a bare
347/// `Command::new(name)`. Computed once and cached for the process — the
348/// (bounded) shell spawn happens at most once per launch, lazily on the first
349/// readiness/run/login, never at construction.
350pub fn augmented_node_path() -> String {
351 static CACHED: OnceLock<String> = OnceLock::new();
352 CACHED.get_or_init(compute_augmented_node_path).clone()
353}
354
355fn compute_augmented_node_path() -> String {
356 let mut parts: Vec<String> = Vec::new();
357 // The process's own PATH first — anything explicitly set still wins.
358 if let Ok(existing) = std::env::var("PATH") {
359 if !existing.is_empty() {
360 parts.push(existing);
361 }
362 }
363 // The user's real PATH (nvm/pnpm/volta/asdf/Homebrew) via their login
364 // shell; a hardcoded best-effort list if that's unavailable.
365 parts.push(login_shell_path().unwrap_or_else(hardcoded_node_dirs));
366 keep_absolute_entries(&parts.join(":"))
367}
368
369/// Keep only **absolute** PATH entries, dropping relative or empty ones (`.`,
370/// `""`, a direnv-style `node_modules/.bin`). Security: we spawn with
371/// `current_dir` set to the user's workspace — where the agent itself writes
372/// files and synced/downloaded content lands — so a relative/empty PATH entry
373/// (which resolves against that cwd) could run a planted `node`/`claude`. An
374/// empty entry is the classic implicit-cwd vector. Absolute dirs only.
375fn keep_absolute_entries(path: &str) -> String {
376 path.split(':')
377 .filter(|entry| entry.starts_with('/'))
378 .collect::<Vec<_>>()
379 .join(":")
380}
381
382/// Resolve PATH by asking the user's login + interactive shell — it sources
383/// their rc, so it knows wherever any node manager (nvm / pnpm / volta / asdf /
384/// fnm / Homebrew) put `node`, without us guessing. Bounded by a timeout so a
385/// slow or interactive rc can't hang us; returns `None` (→ hardcoded fallback)
386/// on any failure: no `$SHELL`, spawn refused (e.g. a sandboxed app), timeout,
387/// or no PATH in the output. Reads PATH from `env` (OS colon format,
388/// shell-agnostic — works for fish too) rather than expanding `$PATH`.
389///
390/// This *executes the user's shell rc*, exactly as opening a terminal does —
391/// their own shell, on their own machine. It is not a privilege/auth step: no
392/// "login session" is created; `-l`/`-i` only select which startup files are
393/// sourced (login profiles + the interactive rc where nvm usually lives).
394/// Printed on its own line right before `env`, so the parser can skip any
395/// shell-init chatter / terminal escape sequences (e.g. iTerm2 shell
396/// integration's `]1337;…` OSC codes) the interactive shell emits before our
397/// command runs — which would otherwise prepend to the `PATH=` line.
398const PATH_SENTINEL: &str = "__CLI_STREAM_PATH__";
399
400#[cfg(unix)]
401fn login_shell_path() -> Option<String> {
402 let shell = std::env::var("SHELL").ok().filter(|s| !s.is_empty())?;
403 // Print a sentinel line, then dump the environment. Reading PATH from `env`
404 // (not by expanding `$PATH`) keeps it OS colon format and shell-agnostic
405 // (fish stores PATH as a list); the sentinel lets the parser ignore
406 // anything the interactive shell prints at startup before `env` runs.
407 let script = format!("printf '\\n{PATH_SENTINEL}\\n'; env");
408 let mut child = Command::new(&shell)
409 .arg("-lic") // -l: login profiles, -i: interactive rc (nvm), -c: command
410 .arg(&script)
411 .stdin(Stdio::null())
412 .stdout(Stdio::piped())
413 .stderr(Stdio::null())
414 .spawn()
415 .ok()?;
416 // Read on a worker thread so the whole query can be bounded by a timeout —
417 // a misbehaving rc must not hang the app. Read bytes + lossy-decode (rather
418 // than `read_to_string`) so non-UTF-8 in the env dump degrades to
419 // replacement chars instead of discarding the whole output.
420 let mut stdout = child.stdout.take()?;
421 let (tx, rx) = mpsc::channel();
422 thread::spawn(move || {
423 let mut buf = Vec::new();
424 let _ = stdout.read_to_end(&mut buf);
425 let _ = tx.send(String::from_utf8_lossy(&buf).into_owned());
426 });
427 // 4s: generous enough for a heavy rc (oh-my-zsh + plugins + nvm lazy-load)
428 // to finish, since this is paid at most once (cached); on timeout we kill
429 // the shell and fall back to the hardcoded list.
430 let output = match rx.recv_timeout(Duration::from_secs(4)) {
431 Ok(buf) => buf,
432 Err(_) => {
433 let _ = child.kill();
434 let _ = child.wait();
435 return None;
436 }
437 };
438 let _ = child.wait();
439 parse_path_from_shell_output(&output)
440}
441
442#[cfg(not(unix))]
443fn login_shell_path() -> Option<String> {
444 None
445}
446
447/// Extract the `PATH=…` value from the shell's `printf <sentinel>; env` output.
448/// Everything up to (and including) the last sentinel is discarded — that's
449/// where shell-init chatter and terminal escape sequences live — then the
450/// `PATH=` line is read from the clean `env` dump that follows. `None` if the
451/// sentinel is missing (query misbehaved) or PATH is absent/empty.
452fn parse_path_from_shell_output(output: &str) -> Option<String> {
453 output
454 .rsplit_once(PATH_SENTINEL)?
455 .1
456 .lines()
457 .find_map(|line| line.strip_prefix("PATH="))
458 .map(str::trim)
459 .filter(|p| !p.is_empty())
460 .map(str::to_owned)
461}
462
463/// Hardcoded best-effort node locations — the fallback when the login-shell
464/// query is unavailable. Leans on the *universal* dirs every distro + macOS
465/// share: `/usr/bin` + `/usr/local/bin` are where apt/dnf/yum/pacman and the
466/// official Node tarball install, so the common Linux container case is covered
467/// without distro-specific guessing. Plus macOS Homebrew, the official-installer
468/// dir, and any nvm-managed node. Anything manager-specific (pnpm/volta/asdf,
469/// Linuxbrew, snap, …) is what the login-shell query is for — and a missing
470/// dir is just skipped, so this is never worse than the bare launchd PATH.
471fn hardcoded_node_dirs() -> String {
472 let mut parts: Vec<String> =
473 vec!["/usr/local/bin:/opt/homebrew/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_owned()];
474 if let Ok(home) = std::env::var("HOME") {
475 if !home.is_empty() {
476 let home_path = Path::new(&home);
477 // Official-installer location for several agent CLIs.
478 parts.push(home_path.join(".local/bin").display().to_string());
479 // nvm: ~/.nvm/versions/node/<version>/bin — where npm-global
480 // CLIs (bob, claude, codex) live under an nvm-managed node.
481 if let Ok(entries) = std::fs::read_dir(home_path.join(".nvm/versions/node")) {
482 for entry in entries.flatten() {
483 let bin = entry.path().join("bin");
484 if bin.is_dir() {
485 parts.push(bin.display().to_string());
486 }
487 }
488 }
489 }
490 }
491 parts.join(":")
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn hardcoded_fallback_includes_macos_defaults() {
500 // The fallback (used when the login-shell query is unavailable) must
501 // still carry Homebrew + the system bins, so a launchd-spawned `.app`
502 // resolves CLIs even without a usable shell — the original
503 // "not installed" fix.
504 let path = hardcoded_node_dirs();
505 assert!(path.contains("/opt/homebrew/bin"), "missing Apple-Silicon Homebrew bin");
506 assert!(path.contains("/usr/local/bin"), "missing Intel Homebrew / system bin");
507 assert!(path.contains("/usr/bin"), "missing system bin");
508 }
509
510 #[test]
511 fn parse_path_from_shell_output_skips_chatter_before_the_sentinel() {
512 // Real-world shape: iTerm2 OSC escapes + a banner emitted at shell
513 // startup, BEFORE our sentinel + `env` dump. Only the post-sentinel
514 // PATH= line counts — note the pre-sentinel "PATH=/decoy" is ignored.
515 let output = "\u{1b}]1337;RemoteHost=x\u{7}welcome banner\nPATH=/decoy\n__CLI_STREAM_PATH__\nHOME=/Users/x\nPATH=/opt/homebrew/bin:/usr/bin\nLANG=en_US";
516 assert_eq!(
517 parse_path_from_shell_output(output).as_deref(),
518 Some("/opt/homebrew/bin:/usr/bin")
519 );
520 // No sentinel (query misbehaved) → None, so the caller falls back —
521 // even if a bare PATH= is present.
522 assert_eq!(parse_path_from_shell_output("PATH=/usr/bin"), None);
523 // Sentinel present but PATH absent/empty → None.
524 assert_eq!(parse_path_from_shell_output("__CLI_STREAM_PATH__\nFOO=bar"), None);
525 assert_eq!(parse_path_from_shell_output("__CLI_STREAM_PATH__\nPATH=\nFOO=bar"), None);
526 }
527
528 #[test]
529 fn keep_absolute_entries_drops_relative_and_empty() {
530 // Relative (`node_modules/.bin`, `.`) and empty entries — which resolve
531 // against the spawn cwd (the user's workspace) — are dropped; absolute
532 // dirs survive in order.
533 assert_eq!(
534 keep_absolute_entries("/opt/homebrew/bin:node_modules/.bin:/usr/bin:.::/bin"),
535 "/opt/homebrew/bin:/usr/bin:/bin"
536 );
537 assert_eq!(keep_absolute_entries("/usr/bin"), "/usr/bin");
538 // All-relative → empty (caller still has the process PATH ahead of it).
539 assert_eq!(keep_absolute_entries(".:rel:"), "");
540 }
541
542 #[test]
543 fn prepend_program_dir_puts_the_binary_dir_first() {
544 let combined = prepend_program_dir(
545 Path::new("/Users/x/.nvm/versions/node/v22/bin/bob"),
546 "/opt/homebrew/bin:/usr/bin",
547 );
548 assert!(combined.starts_with("/Users/x/.nvm/versions/node/v22/bin:"));
549 assert!(combined.contains("/opt/homebrew/bin"));
550 // A bare program name has no parent dir → base path unchanged.
551 assert_eq!(prepend_program_dir(Path::new("bob"), "/usr/bin"), "/usr/bin");
552 }
553
554 #[test]
555 fn augmented_node_path_is_nonempty_and_resolves_system_bin() {
556 // Exercises the cached public path once. `/usr/bin` is present whether
557 // the shell query succeeds (real PATH) or falls back (hardcoded), and
558 // is on the bare launchd PATH too — so this holds in any environment.
559 let path = augmented_node_path();
560 assert!(!path.is_empty());
561 assert!(path.contains("/usr/bin"), "system bin must always resolve");
562 }
563}
564
565/// End-to-end lifecycle tests that spawn real processes. Unix-only: they use
566/// `printf` / `sh` / `sleep`, and the cancel path is signal-based here.
567#[cfg(all(test, unix))]
568mod lifecycle {
569 use super::*;
570 use std::sync::Condvar;
571 use std::time::Instant;
572
573 type Done = Arc<(Mutex<bool>, Condvar)>;
574
575 /// A thread-safe event collector that signals `done` on the terminal
576 /// event. Returns the (cloneable) callback + the shared collections.
577 fn collector() -> (
578 impl FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
579 Arc<Mutex<Vec<ProcessEvent>>>,
580 Done,
581 ) {
582 let events = Arc::new(Mutex::new(Vec::new()));
583 let done: Done = Arc::new((Mutex::new(false), Condvar::new()));
584 let cb = {
585 let events = Arc::clone(&events);
586 let done = Arc::clone(&done);
587 move |ev: ProcessEvent| {
588 let terminal =
589 matches!(ev, ProcessEvent::Exited { .. } | ProcessEvent::Error { .. });
590 events.lock().unwrap().push(ev);
591 if terminal {
592 let (lock, cvar) = &*done;
593 *lock.lock().unwrap() = true;
594 cvar.notify_all();
595 }
596 }
597 };
598 (cb, events, done)
599 }
600
601 /// Block until the terminal event fires, or panic after `secs`.
602 fn wait_done(done: &Done, secs: u64) {
603 let (lock, cvar) = &**done;
604 let mut finished = lock.lock().unwrap();
605 let deadline = Instant::now() + Duration::from_secs(secs);
606 while !*finished {
607 let now = Instant::now();
608 assert!(now < deadline, "process did not finish within {secs}s");
609 let (guard, _) = cvar.wait_timeout(finished, deadline - now).unwrap();
610 finished = guard;
611 }
612 }
613
614 /// Spawn `program args`, block until it exits, return every event.
615 fn run(program: &str, args: &[&str]) -> Vec<ProcessEvent> {
616 let (cb, events, done) = collector();
617 let _handle = spawn_streaming(
618 PathBuf::from(program),
619 args.iter().map(|s| (*s).to_owned()).collect(),
620 Vec::new(),
621 PathBuf::from("."),
622 "t".to_owned(),
623 cb,
624 )
625 .expect("spawn");
626 wait_done(&done, 10);
627 let events = events.lock().unwrap();
628 events.clone()
629 }
630
631 #[test]
632 fn streams_stdout_lines_then_exits_zero() {
633 let events = run("printf", &["%s\n", "alpha", "beta"]);
634 // Started leads, Exited(0, not cancelled) closes.
635 assert!(matches!(events.first(), Some(ProcessEvent::Started { .. })));
636 assert!(matches!(
637 events.last(),
638 Some(ProcessEvent::Exited { exit_code: Some(0), cancelled: false, .. })
639 ));
640 // Lines arrive in order, one event each.
641 let lines: Vec<&str> = events
642 .iter()
643 .filter_map(|e| match e {
644 ProcessEvent::Stdout { line, .. } => Some(line.as_str()),
645 _ => None,
646 })
647 .collect();
648 assert_eq!(lines, vec!["alpha", "beta"]);
649 }
650
651 #[test]
652 fn nonzero_exit_code_is_reported() {
653 let events = run("sh", &["-c", "exit 3"]);
654 assert!(matches!(
655 events.last(),
656 Some(ProcessEvent::Exited { exit_code: Some(3), cancelled: false, .. })
657 ));
658 }
659
660 #[test]
661 fn env_vars_are_passed_to_the_child() {
662 // The `env` argument must reach the child's environment — exercise it
663 // directly (the other lifecycle tests pass an empty env).
664 let (cb, events, done) = collector();
665 let _handle = spawn_streaming(
666 PathBuf::from("sh"),
667 vec!["-c".to_owned(), "printf '%s\\n' \"$CLI_STREAM_STUB\"".to_owned()],
668 vec![("CLI_STREAM_STUB".to_owned(), "from-env".to_owned())],
669 PathBuf::from("."),
670 "t".to_owned(),
671 cb,
672 )
673 .expect("spawn");
674 wait_done(&done, 10);
675 let events = events.lock().unwrap();
676 assert!(
677 events
678 .iter()
679 .any(|e| matches!(e, ProcessEvent::Stdout { line, .. } if line == "from-env")),
680 "child should observe the injected env var, got {events:?}"
681 );
682 }
683
684 #[test]
685 fn stderr_is_streamed_and_not_misrouted_to_stdout() {
686 let events = run("sh", &["-c", "echo to-stderr 1>&2"]);
687 assert!(events
688 .iter()
689 .any(|e| matches!(e, ProcessEvent::Stderr { line, .. } if line == "to-stderr")));
690 assert!(!events.iter().any(|e| matches!(e, ProcessEvent::Stdout { .. })));
691 assert!(events
692 .iter()
693 .any(|e| matches!(e, ProcessEvent::Exited { exit_code: Some(0), .. })));
694 }
695
696 #[test]
697 fn cancel_promptly_terminates_the_run_and_flags_it() {
698 // A 10s sleeper we cancel ~immediately; a working engine must kill it
699 // far sooner than 10s. `exec` so the process *is* sleep (no orphan).
700 let (cb, events, done) = collector();
701 let handle = spawn_streaming(
702 PathBuf::from("sh"),
703 vec!["-c".to_owned(), "exec sleep 10".to_owned()],
704 Vec::new(),
705 PathBuf::from("."),
706 "t".to_owned(),
707 cb,
708 )
709 .expect("spawn");
710
711 // cancel() may block until the child is reaped, so fire it off-thread.
712 let canceller = handle.clone();
713 thread::spawn(move || {
714 thread::sleep(Duration::from_millis(100));
715 let _ = canceller.cancel();
716 });
717
718 // Correct cancellation terminates the 10s sleep within a few seconds.
719 wait_done(&done, 4);
720 assert!(handle.was_cancelled());
721 let events = events.lock().unwrap();
722 assert!(
723 matches!(events.last(), Some(ProcessEvent::Exited { cancelled: true, .. })),
724 "expected Exited(cancelled=true), got {:?}",
725 events.last()
726 );
727 }
728
729 #[test]
730 fn spawning_a_missing_binary_is_err() {
731 let result = spawn_streaming(
732 PathBuf::from("cli-stream-no-such-binary-zzz"),
733 Vec::new(),
734 Vec::new(),
735 PathBuf::from("."),
736 "t".to_owned(),
737 |_ev: ProcessEvent| {},
738 );
739 // Typed: a `Spawn` error carrying the OS `NotFound` io::Error as its
740 // source — the whole point of `StreamError` over a `String`. A caller
741 // can branch on `ErrorKind` to tell "not installed" (NotFound) from
742 // "permission denied", which a flattened string can't support.
743 match result {
744 Err(StreamError::Spawn { program, source }) => {
745 assert!(program.contains("cli-stream-no-such-binary-zzz"));
746 assert_eq!(source.kind(), std::io::ErrorKind::NotFound);
747 }
748 other => panic!("expected StreamError::Spawn, got {other:?}"),
749 }
750 }
751}