Skip to main content

cli_stream/
process.rs

1//! Generic streaming subprocess engine — the shared core behind every
2//! process-backed harness (bob, Claude Code, Codex, …).
3//!
4//! Spawns a child, pipes stdout/stderr line-by-line through a callback
5//! as [`ProcessEvent`]s, augments PATH so Node-based CLIs resolve even
6//! from a Finder-launched `.app`, and hands back a [`ProcessHandle`] for
7//! cancellation (SIGTERM → SIGKILL). No harness-trait or bob knowledge —
8//! purely subprocess streaming.
9//!
10//! Cancellation is the wrinkle: a run needs to be stoppable mid-stream
11//! when the user closes the tab or hits "stop". `ProcessHandle::cancel()`
12//! sends SIGTERM (with a SIGKILL fallback) and flips an atomic
13//! `cancelled` flag the reader threads use to short-circuit.
14
15use crate::error::StreamError;
16use serde::Serialize;
17use std::io::{BufRead, BufReader, Read};
18use std::path::{Path, PathBuf};
19use std::process::{Child, Command, Stdio};
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{mpsc, Arc, Mutex, OnceLock};
22use std::thread;
23use std::time::Duration;
24
25/// Raw events emitted to the caller's callback during a streaming run.
26/// JSON-tagged so axum SSE and Tauri Channel render identical payloads
27/// on the wire. Harness-neutral: a process-backed adapter parses the
28/// `Stdout` lines into a normalized event vocabulary (e.g. `agent-harness`'s `RunEvent`).
29#[derive(Debug, Clone, Serialize)]
30#[serde(tag = "kind", rename_all = "camelCase")]
31// New lifecycle events can be added without breaking downstream matches —
32// consumers must carry a `_` arm. (Construction of existing variants is
33// unaffected, so it's still ergonomic to build them.)
34#[non_exhaustive]
35pub enum ProcessEvent {
36    /// First event. Sent before the child has produced any output so the
37    /// UI can show a "thinking…" state.
38    Started { run_id: String },
39    /// Raw stdout line. Process-backed CLIs emit one JSON object per line
40    /// in their streaming mode. The caller parses.
41    Stdout { run_id: String, line: String },
42    /// Raw stderr line. Warnings + the occasional error.
43    Stderr { run_id: String, line: String },
44    /// Spawn / IO failure. Terminal — followed by `Exited`.
45    Error { run_id: String, message: String },
46    /// Process exited. Always sent exactly once at the end.
47    Exited {
48        run_id: String,
49        exit_code: Option<i32>,
50        /// True iff `cancel()` was called before exit.
51        cancelled: bool,
52    },
53}
54
55/// Handle to an in-flight streaming run. Caller stores it (e.g. in a
56/// runId-keyed map) so a later `cancel()` can find it.
57///
58/// Dropping the handle does NOT cancel the run — the reader threads +
59/// wait thread continue independently. Use `cancel()` explicitly when
60/// the user closes the connection.
61#[derive(Clone, Debug)]
62pub struct ProcessHandle {
63    inner: Arc<HandleInner>,
64}
65
66#[derive(Debug)]
67struct HandleInner {
68    child: Mutex<Option<Child>>,
69    cancelled: AtomicBool,
70}
71
72impl ProcessHandle {
73    /// SIGTERM the process, then SIGKILL after 1.5s if it's still alive.
74    /// The CLI is supposed to flush a final result on SIGTERM but we
75    /// don't trust it to do so forever.
76    pub fn cancel(&self) -> Result<(), StreamError> {
77        self.inner.cancelled.store(true, Ordering::SeqCst);
78        let mut guard = self
79            .inner
80            .child
81            .lock()
82            .map_err(|_| StreamError::CancelLockPoisoned)?;
83        let Some(child) = guard.as_mut() else {
84            // Already exited.
85            return Ok(());
86        };
87        // Best-effort SIGTERM. On Unix, kill() sends SIGKILL by default;
88        // we use libc::kill for SIGTERM, falling back to child.kill() if
89        // the libc call fails. On Windows there's only TerminateProcess
90        // via .kill().
91        #[cfg(unix)]
92        {
93            let pid = child.id() as i32;
94            // SAFETY: pid is the child's PID owned by this Child; sending
95            // SIGTERM is well-defined.
96            unsafe { libc::kill(pid, libc::SIGTERM) };
97            // Spawn the SIGKILL fallback inline to avoid holding the mutex
98            // while sleeping.
99            let inner = Arc::clone(&self.inner);
100            thread::spawn(move || {
101                thread::sleep(Duration::from_millis(1500));
102                if let Ok(mut guard) = inner.child.lock() {
103                    if let Some(child) = guard.as_mut() {
104                        let _ = child.kill();
105                    }
106                }
107            });
108        }
109        #[cfg(not(unix))]
110        {
111            let _ = child.kill();
112        }
113        Ok(())
114    }
115
116    /// Whether `cancel()` was called. Tagged on the final `Exited` event.
117    pub fn was_cancelled(&self) -> bool {
118        self.inner.cancelled.load(Ordering::SeqCst)
119    }
120}
121
122/// Spawn an arbitrary streaming child process — the generic engine behind
123/// every process-backed harness (bob, Claude Code, Codex).
124///
125/// Pipes stdout/stderr line-by-line through `callback` using the raw
126/// [`ProcessEvent`] vocabulary (Started / Stdout / Stderr / Error /
127/// Exited). `env` supplies per-harness secrets (each harness's API-key
128/// var, or none for self-authenticating CLIs). PATH is augmented so
129/// Node-based CLIs find `node`. Returns a [`ProcessHandle`] for
130/// cancellation.
131///
132/// `callback` is invoked from three threads (stdout reader, stderr
133/// reader, exit watcher); the `Clone` bound lets us hand a copy to each.
134/// `run_id` is opaque — the caller chooses it and uses it to correlate
135/// events with the handle.
136///
137/// ```no_run
138/// use cli_stream::{spawn_streaming, ProcessEvent};
139/// use std::path::PathBuf;
140///
141/// # fn main() -> Result<(), cli_stream::StreamError> {
142/// let handle = spawn_streaming(
143///     PathBuf::from("echo"),
144///     vec!["hello".to_owned()],
145///     Vec::new(),                      // extra env vars (key, value)
146///     std::env::current_dir().unwrap(),
147///     "run-1".to_owned(),              // your correlation id
148///     |event| match event {
149///         ProcessEvent::Stdout { line, .. } => println!("{line}"),
150///         ProcessEvent::Exited { exit_code, .. } => eprintln!("exit {exit_code:?}"),
151///         _ => {}
152///     },
153/// )?;
154/// // `handle.cancel()` stops it early; dropping the handle does not.
155/// let _ = handle;
156/// # Ok(())
157/// # }
158/// ```
159pub fn spawn_streaming<F>(
160    program: PathBuf,
161    args: Vec<String>,
162    env: Vec<(String, String)>,
163    cwd: PathBuf,
164    run_id: String,
165    callback: F,
166) -> Result<ProcessHandle, StreamError>
167where
168    F: FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
169{
170    // PATH augmentation: Node-based CLIs (bob, claude, codex) expect
171    // `node` (and often `npm`, `git`) on PATH. A desktop app launched
172    // from Finder/Launchpad inherits only the minimal launchd PATH
173    // (`/usr/bin:/bin:/usr/sbin:/sbin`), so an nvm-installed node is
174    // invisible and the child exits 127 ("command not found").
175    //
176    // Fix: prepend the program's parent dir (where node also lives in an
177    // nvm install) to the child's PATH. Added, not replaced, so a PATH
178    // the user explicitly set still wins on later lookups.
179    let augmented_path = augment_path_for_node(&program);
180
181    let mut command = Command::new(&program);
182    command
183        .args(&args)
184        .current_dir(&cwd)
185        .env("PATH", augmented_path)
186        .stdin(Stdio::null())
187        .stdout(Stdio::piped())
188        .stderr(Stdio::piped());
189    for (key, value) in &env {
190        command.env(key, value);
191    }
192    let mut child = command.spawn().map_err(|source| StreamError::Spawn {
193        program: program.display().to_string(),
194        source,
195    })?;
196
197    let stdout = child
198        .stdout
199        .take()
200        .ok_or(StreamError::PipeNotCaptured { stream: "stdout" })?;
201    let stderr = child
202        .stderr
203        .take()
204        .ok_or(StreamError::PipeNotCaptured { stream: "stderr" })?;
205
206    let inner = Arc::new(HandleInner {
207        child: Mutex::new(Some(child)),
208        cancelled: AtomicBool::new(false),
209    });
210    let handle = ProcessHandle { inner: Arc::clone(&inner) };
211
212    // Emit Started immediately so the caller doesn't wait on the first
213    // output line for a UI signal.
214    let mut started_cb = callback.clone();
215    started_cb(ProcessEvent::Started { run_id: run_id.clone() });
216
217    // Reader threads. Each owns its own callback clone — the Clone bound
218    // is the whole point.
219    let stdout_cb = callback.clone();
220    let stdout_run_id = run_id.clone();
221    let stdout_handle = thread::spawn(move || {
222        pump_lines(stdout, stdout_run_id, true, stdout_cb);
223    });
224
225    let stderr_cb = callback.clone();
226    let stderr_run_id = run_id.clone();
227    let stderr_handle = thread::spawn(move || {
228        pump_lines(stderr, stderr_run_id, false, stderr_cb);
229    });
230
231    // Exit watcher — emits the terminal Exited event with the cancellation
232    // flag. It must NOT hold the child lock across a blocking `wait()`:
233    // `cancel()` needs that same lock to signal the child, so a held lock
234    // would block cancel until the process exited on its own (defeating it).
235    // Instead poll `try_wait()`, locking only for each non-blocking check and
236    // releasing between polls so `cancel()` can acquire the lock mid-run.
237    let exit_inner = Arc::clone(&inner);
238    let mut exit_cb = callback;
239    let exit_run_id = run_id;
240    thread::spawn(move || {
241        let wait_result = loop {
242            {
243                let mut guard = match exit_inner.child.lock() {
244                    Ok(guard) => guard,
245                    Err(_) => return, // poisoned — nothing safe to do
246                };
247                match guard.as_mut() {
248                    Some(child) => match child.try_wait() {
249                        Ok(Some(status)) => break Ok(status),
250                        Ok(None) => {}            // still running; poll again
251                        Err(err) => break Err(err),
252                    },
253                    None => return, // already reaped
254                }
255            } // lock released before sleeping, so cancel() can acquire it
256            thread::sleep(Duration::from_millis(50));
257        };
258        let _ = stdout_handle.join();
259        let _ = stderr_handle.join();
260        let cancelled = exit_inner.cancelled.load(Ordering::SeqCst);
261
262        match wait_result {
263            Ok(status) => exit_cb(ProcessEvent::Exited {
264                run_id: exit_run_id.clone(),
265                exit_code: status.code(),
266                cancelled,
267            }),
268            Err(err) => exit_cb(ProcessEvent::Error {
269                run_id: exit_run_id.clone(),
270                message: format!("wait failed: {err}"),
271            }),
272        }
273
274        // Drop the child handle so subsequent cancel() calls
275        // short-circuit cleanly.
276        if let Ok(mut guard) = exit_inner.child.lock() {
277            *guard = None;
278        }
279    });
280
281    Ok(handle)
282}
283
284fn pump_lines<R, F>(reader: R, run_id: String, is_stdout: bool, mut callback: F)
285where
286    R: Read,
287    F: FnMut(ProcessEvent),
288{
289    let buffered = BufReader::new(reader);
290    for line in buffered.lines() {
291        match line {
292            Ok(text) => {
293                let event = if is_stdout {
294                    ProcessEvent::Stdout { run_id: run_id.clone(), line: text }
295                } else {
296                    ProcessEvent::Stderr { run_id: run_id.clone(), line: text }
297                };
298                callback(event);
299            }
300            Err(err) => {
301                callback(ProcessEvent::Error {
302                    run_id: run_id.clone(),
303                    message: format!("stream read failed: {err}"),
304                });
305                return;
306            }
307        }
308    }
309}
310
311/// Compose a PATH for the spawned process that always includes the
312/// directory containing the program — where `node`, `npm`, and friends
313/// usually live in an nvm install. The user's existing PATH stays as a
314/// fallback after our prepended directory.
315fn augment_path_for_node(program: &Path) -> String {
316    prepend_program_dir(program, &augmented_node_path())
317}
318
319/// Prepend the directory containing `program` (where `node` also lives in an
320/// nvm install) to `base_path`, so the resolved binary's own dir is searched
321/// first. Pure (no env / no spawn) so it's unit-tested directly.
322fn prepend_program_dir(program: &Path, base_path: &str) -> String {
323    match program
324        .parent()
325        .map(|p| p.display().to_string())
326        .filter(|s| !s.is_empty())
327    {
328        Some(dir) => format!("{dir}:{base_path}"),
329        None => base_path.to_owned(),
330    }
331}
332
333/// A PATH that resolves Node-based CLIs (bob, claude, codex) even from a
334/// process launched by Finder/Launchpad, which inherits only the minimal
335/// launchd PATH (`/usr/bin:/bin:/usr/sbin:/sbin`) rather than the user's
336/// shell PATH.
337///
338/// Strategy: keep the process's own PATH first (an explicit PATH still wins),
339/// then append the user's **real** PATH as resolved by their login shell —
340/// which sources their rc, so it knows where nvm / pnpm / volta / asdf / fnm /
341/// Homebrew put `node`, with no guessing. If the shell query is unavailable
342/// (no `$SHELL`, a timeout, a sandboxed app that can't spawn, …) we fall back
343/// to a hardcoded best-effort list, so we're never worse than before.
344///
345/// Used by the run path (which prepends the resolved binary's own dir on top
346/// of this) and by readiness probes that locate `claude`/`codex` via a bare
347/// `Command::new(name)`. Computed once and cached for the process — the
348/// (bounded) shell spawn happens at most once per launch, lazily on the first
349/// readiness/run/login, never at construction.
350pub fn augmented_node_path() -> String {
351    static CACHED: OnceLock<String> = OnceLock::new();
352    CACHED.get_or_init(compute_augmented_node_path).clone()
353}
354
355fn compute_augmented_node_path() -> String {
356    let mut parts: Vec<String> = Vec::new();
357    // The process's own PATH first — anything explicitly set still wins.
358    if let Ok(existing) = std::env::var("PATH") {
359        if !existing.is_empty() {
360            parts.push(existing);
361        }
362    }
363    // The user's real PATH (nvm/pnpm/volta/asdf/Homebrew) via their login
364    // shell; a hardcoded best-effort list if that's unavailable.
365    parts.push(login_shell_path().unwrap_or_else(hardcoded_node_dirs));
366    keep_absolute_entries(&parts.join(":"))
367}
368
369/// Keep only **absolute** PATH entries, dropping relative or empty ones (`.`,
370/// `""`, a direnv-style `node_modules/.bin`). Security: we spawn with
371/// `current_dir` set to the user's workspace — where the agent itself writes
372/// files and synced/downloaded content lands — so a relative/empty PATH entry
373/// (which resolves against that cwd) could run a planted `node`/`claude`. An
374/// empty entry is the classic implicit-cwd vector. Absolute dirs only.
375fn keep_absolute_entries(path: &str) -> String {
376    path.split(':')
377        .filter(|entry| entry.starts_with('/'))
378        .collect::<Vec<_>>()
379        .join(":")
380}
381
382/// Resolve PATH by asking the user's login + interactive shell — it sources
383/// their rc, so it knows wherever any node manager (nvm / pnpm / volta / asdf /
384/// fnm / Homebrew) put `node`, without us guessing. Bounded by a timeout so a
385/// slow or interactive rc can't hang us; returns `None` (→ hardcoded fallback)
386/// on any failure: no `$SHELL`, spawn refused (e.g. a sandboxed app), timeout,
387/// or no PATH in the output. Reads PATH from `env` (OS colon format,
388/// shell-agnostic — works for fish too) rather than expanding `$PATH`.
389///
390/// This *executes the user's shell rc*, exactly as opening a terminal does —
391/// their own shell, on their own machine. It is not a privilege/auth step: no
392/// "login session" is created; `-l`/`-i` only select which startup files are
393/// sourced (login profiles + the interactive rc where nvm usually lives).
394/// Printed on its own line right before `env`, so the parser can skip any
395/// shell-init chatter / terminal escape sequences (e.g. iTerm2 shell
396/// integration's `]1337;…` OSC codes) the interactive shell emits before our
397/// command runs — which would otherwise prepend to the `PATH=` line.
398const PATH_SENTINEL: &str = "__CLI_STREAM_PATH__";
399
400#[cfg(unix)]
401fn login_shell_path() -> Option<String> {
402    let shell = std::env::var("SHELL").ok().filter(|s| !s.is_empty())?;
403    // Print a sentinel line, then dump the environment. Reading PATH from `env`
404    // (not by expanding `$PATH`) keeps it OS colon format and shell-agnostic
405    // (fish stores PATH as a list); the sentinel lets the parser ignore
406    // anything the interactive shell prints at startup before `env` runs.
407    let script = format!("printf '\\n{PATH_SENTINEL}\\n'; env");
408    let mut child = Command::new(&shell)
409        .arg("-lic") // -l: login profiles, -i: interactive rc (nvm), -c: command
410        .arg(&script)
411        .stdin(Stdio::null())
412        .stdout(Stdio::piped())
413        .stderr(Stdio::null())
414        .spawn()
415        .ok()?;
416    // Read on a worker thread so the whole query can be bounded by a timeout —
417    // a misbehaving rc must not hang the app. Read bytes + lossy-decode (rather
418    // than `read_to_string`) so non-UTF-8 in the env dump degrades to
419    // replacement chars instead of discarding the whole output.
420    let mut stdout = child.stdout.take()?;
421    let (tx, rx) = mpsc::channel();
422    thread::spawn(move || {
423        let mut buf = Vec::new();
424        let _ = stdout.read_to_end(&mut buf);
425        let _ = tx.send(String::from_utf8_lossy(&buf).into_owned());
426    });
427    // 4s: generous enough for a heavy rc (oh-my-zsh + plugins + nvm lazy-load)
428    // to finish, since this is paid at most once (cached); on timeout we kill
429    // the shell and fall back to the hardcoded list.
430    let output = match rx.recv_timeout(Duration::from_secs(4)) {
431        Ok(buf) => buf,
432        Err(_) => {
433            let _ = child.kill();
434            let _ = child.wait();
435            return None;
436        }
437    };
438    let _ = child.wait();
439    parse_path_from_shell_output(&output)
440}
441
442#[cfg(not(unix))]
443fn login_shell_path() -> Option<String> {
444    None
445}
446
447/// Extract the `PATH=…` value from the shell's `printf <sentinel>; env` output.
448/// Everything up to (and including) the last sentinel is discarded — that's
449/// where shell-init chatter and terminal escape sequences live — then the
450/// `PATH=` line is read from the clean `env` dump that follows. `None` if the
451/// sentinel is missing (query misbehaved) or PATH is absent/empty.
452fn parse_path_from_shell_output(output: &str) -> Option<String> {
453    output
454        .rsplit_once(PATH_SENTINEL)?
455        .1
456        .lines()
457        .find_map(|line| line.strip_prefix("PATH="))
458        .map(str::trim)
459        .filter(|p| !p.is_empty())
460        .map(str::to_owned)
461}
462
463/// Hardcoded best-effort node locations — the fallback when the login-shell
464/// query is unavailable. Leans on the *universal* dirs every distro + macOS
465/// share: `/usr/bin` + `/usr/local/bin` are where apt/dnf/yum/pacman and the
466/// official Node tarball install, so the common Linux container case is covered
467/// without distro-specific guessing. Plus macOS Homebrew, the official-installer
468/// dir, and any nvm-managed node. Anything manager-specific (pnpm/volta/asdf,
469/// Linuxbrew, snap, …) is what the login-shell query is for — and a missing
470/// dir is just skipped, so this is never worse than the bare launchd PATH.
471fn hardcoded_node_dirs() -> String {
472    let mut parts: Vec<String> =
473        vec!["/usr/local/bin:/opt/homebrew/bin:/usr/bin:/bin:/usr/sbin:/sbin".to_owned()];
474    if let Ok(home) = std::env::var("HOME") {
475        if !home.is_empty() {
476            let home_path = Path::new(&home);
477            // Official-installer location for several agent CLIs.
478            parts.push(home_path.join(".local/bin").display().to_string());
479            // nvm: ~/.nvm/versions/node/<version>/bin — where npm-global
480            // CLIs (bob, claude, codex) live under an nvm-managed node.
481            if let Ok(entries) = std::fs::read_dir(home_path.join(".nvm/versions/node")) {
482                for entry in entries.flatten() {
483                    let bin = entry.path().join("bin");
484                    if bin.is_dir() {
485                        parts.push(bin.display().to_string());
486                    }
487                }
488            }
489        }
490    }
491    parts.join(":")
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn hardcoded_fallback_includes_macos_defaults() {
500        // The fallback (used when the login-shell query is unavailable) must
501        // still carry Homebrew + the system bins, so a launchd-spawned `.app`
502        // resolves CLIs even without a usable shell — the original
503        // "not installed" fix.
504        let path = hardcoded_node_dirs();
505        assert!(path.contains("/opt/homebrew/bin"), "missing Apple-Silicon Homebrew bin");
506        assert!(path.contains("/usr/local/bin"), "missing Intel Homebrew / system bin");
507        assert!(path.contains("/usr/bin"), "missing system bin");
508    }
509
510    #[test]
511    fn parse_path_from_shell_output_skips_chatter_before_the_sentinel() {
512        // Real-world shape: iTerm2 OSC escapes + a banner emitted at shell
513        // startup, BEFORE our sentinel + `env` dump. Only the post-sentinel
514        // PATH= line counts — note the pre-sentinel "PATH=/decoy" is ignored.
515        let output = "\u{1b}]1337;RemoteHost=x\u{7}welcome banner\nPATH=/decoy\n__CLI_STREAM_PATH__\nHOME=/Users/x\nPATH=/opt/homebrew/bin:/usr/bin\nLANG=en_US";
516        assert_eq!(
517            parse_path_from_shell_output(output).as_deref(),
518            Some("/opt/homebrew/bin:/usr/bin")
519        );
520        // No sentinel (query misbehaved) → None, so the caller falls back —
521        // even if a bare PATH= is present.
522        assert_eq!(parse_path_from_shell_output("PATH=/usr/bin"), None);
523        // Sentinel present but PATH absent/empty → None.
524        assert_eq!(parse_path_from_shell_output("__CLI_STREAM_PATH__\nFOO=bar"), None);
525        assert_eq!(parse_path_from_shell_output("__CLI_STREAM_PATH__\nPATH=\nFOO=bar"), None);
526    }
527
528    #[test]
529    fn keep_absolute_entries_drops_relative_and_empty() {
530        // Relative (`node_modules/.bin`, `.`) and empty entries — which resolve
531        // against the spawn cwd (the user's workspace) — are dropped; absolute
532        // dirs survive in order.
533        assert_eq!(
534            keep_absolute_entries("/opt/homebrew/bin:node_modules/.bin:/usr/bin:.::/bin"),
535            "/opt/homebrew/bin:/usr/bin:/bin"
536        );
537        assert_eq!(keep_absolute_entries("/usr/bin"), "/usr/bin");
538        // All-relative → empty (caller still has the process PATH ahead of it).
539        assert_eq!(keep_absolute_entries(".:rel:"), "");
540    }
541
542    #[test]
543    fn prepend_program_dir_puts_the_binary_dir_first() {
544        let combined = prepend_program_dir(
545            Path::new("/Users/x/.nvm/versions/node/v22/bin/bob"),
546            "/opt/homebrew/bin:/usr/bin",
547        );
548        assert!(combined.starts_with("/Users/x/.nvm/versions/node/v22/bin:"));
549        assert!(combined.contains("/opt/homebrew/bin"));
550        // A bare program name has no parent dir → base path unchanged.
551        assert_eq!(prepend_program_dir(Path::new("bob"), "/usr/bin"), "/usr/bin");
552    }
553
554    #[test]
555    fn augmented_node_path_is_nonempty_and_resolves_system_bin() {
556        // Exercises the cached public path once. `/usr/bin` is present whether
557        // the shell query succeeds (real PATH) or falls back (hardcoded), and
558        // is on the bare launchd PATH too — so this holds in any environment.
559        let path = augmented_node_path();
560        assert!(!path.is_empty());
561        assert!(path.contains("/usr/bin"), "system bin must always resolve");
562    }
563}
564
565/// End-to-end lifecycle tests that spawn real processes. Unix-only: they use
566/// `printf` / `sh` / `sleep`, and the cancel path is signal-based here.
567#[cfg(all(test, unix))]
568mod lifecycle {
569    use super::*;
570    use std::sync::Condvar;
571    use std::time::Instant;
572
573    type Done = Arc<(Mutex<bool>, Condvar)>;
574
575    /// A thread-safe event collector that signals `done` on the terminal
576    /// event. Returns the (cloneable) callback + the shared collections.
577    fn collector() -> (
578        impl FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
579        Arc<Mutex<Vec<ProcessEvent>>>,
580        Done,
581    ) {
582        let events = Arc::new(Mutex::new(Vec::new()));
583        let done: Done = Arc::new((Mutex::new(false), Condvar::new()));
584        let cb = {
585            let events = Arc::clone(&events);
586            let done = Arc::clone(&done);
587            move |ev: ProcessEvent| {
588                let terminal =
589                    matches!(ev, ProcessEvent::Exited { .. } | ProcessEvent::Error { .. });
590                events.lock().unwrap().push(ev);
591                if terminal {
592                    let (lock, cvar) = &*done;
593                    *lock.lock().unwrap() = true;
594                    cvar.notify_all();
595                }
596            }
597        };
598        (cb, events, done)
599    }
600
601    /// Block until the terminal event fires, or panic after `secs`.
602    fn wait_done(done: &Done, secs: u64) {
603        let (lock, cvar) = &**done;
604        let mut finished = lock.lock().unwrap();
605        let deadline = Instant::now() + Duration::from_secs(secs);
606        while !*finished {
607            let now = Instant::now();
608            assert!(now < deadline, "process did not finish within {secs}s");
609            let (guard, _) = cvar.wait_timeout(finished, deadline - now).unwrap();
610            finished = guard;
611        }
612    }
613
614    /// Spawn `program args`, block until it exits, return every event.
615    fn run(program: &str, args: &[&str]) -> Vec<ProcessEvent> {
616        let (cb, events, done) = collector();
617        let _handle = spawn_streaming(
618            PathBuf::from(program),
619            args.iter().map(|s| (*s).to_owned()).collect(),
620            Vec::new(),
621            PathBuf::from("."),
622            "t".to_owned(),
623            cb,
624        )
625        .expect("spawn");
626        wait_done(&done, 10);
627        let events = events.lock().unwrap();
628        events.clone()
629    }
630
631    #[test]
632    fn streams_stdout_lines_then_exits_zero() {
633        let events = run("printf", &["%s\n", "alpha", "beta"]);
634        // Started leads, Exited(0, not cancelled) closes.
635        assert!(matches!(events.first(), Some(ProcessEvent::Started { .. })));
636        assert!(matches!(
637            events.last(),
638            Some(ProcessEvent::Exited { exit_code: Some(0), cancelled: false, .. })
639        ));
640        // Lines arrive in order, one event each.
641        let lines: Vec<&str> = events
642            .iter()
643            .filter_map(|e| match e {
644                ProcessEvent::Stdout { line, .. } => Some(line.as_str()),
645                _ => None,
646            })
647            .collect();
648        assert_eq!(lines, vec!["alpha", "beta"]);
649    }
650
651    #[test]
652    fn nonzero_exit_code_is_reported() {
653        let events = run("sh", &["-c", "exit 3"]);
654        assert!(matches!(
655            events.last(),
656            Some(ProcessEvent::Exited { exit_code: Some(3), cancelled: false, .. })
657        ));
658    }
659
660    #[test]
661    fn env_vars_are_passed_to_the_child() {
662        // The `env` argument must reach the child's environment — exercise it
663        // directly (the other lifecycle tests pass an empty env).
664        let (cb, events, done) = collector();
665        let _handle = spawn_streaming(
666            PathBuf::from("sh"),
667            vec!["-c".to_owned(), "printf '%s\\n' \"$CLI_STREAM_STUB\"".to_owned()],
668            vec![("CLI_STREAM_STUB".to_owned(), "from-env".to_owned())],
669            PathBuf::from("."),
670            "t".to_owned(),
671            cb,
672        )
673        .expect("spawn");
674        wait_done(&done, 10);
675        let events = events.lock().unwrap();
676        assert!(
677            events
678                .iter()
679                .any(|e| matches!(e, ProcessEvent::Stdout { line, .. } if line == "from-env")),
680            "child should observe the injected env var, got {events:?}"
681        );
682    }
683
684    #[test]
685    fn stderr_is_streamed_and_not_misrouted_to_stdout() {
686        let events = run("sh", &["-c", "echo to-stderr 1>&2"]);
687        assert!(events
688            .iter()
689            .any(|e| matches!(e, ProcessEvent::Stderr { line, .. } if line == "to-stderr")));
690        assert!(!events.iter().any(|e| matches!(e, ProcessEvent::Stdout { .. })));
691        assert!(events
692            .iter()
693            .any(|e| matches!(e, ProcessEvent::Exited { exit_code: Some(0), .. })));
694    }
695
696    #[test]
697    fn cancel_promptly_terminates_the_run_and_flags_it() {
698        // A 10s sleeper we cancel ~immediately; a working engine must kill it
699        // far sooner than 10s. `exec` so the process *is* sleep (no orphan).
700        let (cb, events, done) = collector();
701        let handle = spawn_streaming(
702            PathBuf::from("sh"),
703            vec!["-c".to_owned(), "exec sleep 10".to_owned()],
704            Vec::new(),
705            PathBuf::from("."),
706            "t".to_owned(),
707            cb,
708        )
709        .expect("spawn");
710
711        // cancel() may block until the child is reaped, so fire it off-thread.
712        let canceller = handle.clone();
713        thread::spawn(move || {
714            thread::sleep(Duration::from_millis(100));
715            let _ = canceller.cancel();
716        });
717
718        // Correct cancellation terminates the 10s sleep within a few seconds.
719        wait_done(&done, 4);
720        assert!(handle.was_cancelled());
721        let events = events.lock().unwrap();
722        assert!(
723            matches!(events.last(), Some(ProcessEvent::Exited { cancelled: true, .. })),
724            "expected Exited(cancelled=true), got {:?}",
725            events.last()
726        );
727    }
728
729    #[test]
730    fn spawning_a_missing_binary_is_err() {
731        let result = spawn_streaming(
732            PathBuf::from("cli-stream-no-such-binary-zzz"),
733            Vec::new(),
734            Vec::new(),
735            PathBuf::from("."),
736            "t".to_owned(),
737            |_ev: ProcessEvent| {},
738        );
739        // Typed: a `Spawn` error carrying the OS `NotFound` io::Error as its
740        // source — the whole point of `StreamError` over a `String`. A caller
741        // can branch on `ErrorKind` to tell "not installed" (NotFound) from
742        // "permission denied", which a flattened string can't support.
743        match result {
744            Err(StreamError::Spawn { program, source }) => {
745                assert!(program.contains("cli-stream-no-such-binary-zzz"));
746                assert_eq!(source.kind(), std::io::ErrorKind::NotFound);
747            }
748            other => panic!("expected StreamError::Spawn, got {other:?}"),
749        }
750    }
751}