Skip to main content

rusty_pee/
capture.rs

1//! `--capture` mode: replace each child's stdout with a piped handle, buffer
2//! to completion, emit captured chunks in argv order after all children exit
3//! (FR-017 + AD-007).
4//!
5//! Without `--capture` (the default), children inherit the parent's stdout
6//! and their outputs interleave nondeterministically. With `--capture`, the
7//! parent guarantees argv-ordered emission — useful for tests and scripts
8//! that need reproducible output (US5).
9//!
10//! Clarification Q2 + Q6 from spec.md:
11//! - Q2: at spawn time, each child's stdout is replaced with `Stdio::piped()`.
12//! - Q6: empty-stdout children emit nothing (no separator inserted).
13
14use std::io::Read;
15use std::process::{Child, ExitStatus};
16
17use crate::fanout;
18use crate::spawner;
19
20/// Spawn all commands with piped stdout (for capture mode). Returns the
21/// spawned children in argv order. On any spawn failure, kills already-spawned
22/// children and returns the io error.
23pub fn spawn_all_piped(cmds: &[String]) -> std::io::Result<Vec<Child>> {
24    let mut children = Vec::with_capacity(cmds.len());
25    for cmd in cmds {
26        match spawner::spawn_one_piped_stdout(cmd) {
27            Ok(c) => children.push(c),
28            Err(e) => {
29                for mut prior in children.into_iter() {
30                    let _ = prior.kill();
31                    let _ = prior.wait();
32                }
33                return Err(e);
34            }
35        }
36    }
37    Ok(children)
38}
39
40/// Drive the fan-out + capture: feed reader to every child's stdin, then read
41/// every child's stdout into a per-argv-position buffer, then wait + collect
42/// statuses, then emit captured bytes in argv order to `out`.
43///
44/// Returns the per-child exit statuses for downstream aggregation.
45pub fn run_with_capture<R: Read, W: std::io::Write>(
46    reader: R,
47    children: Vec<Child>,
48    out: &mut W,
49) -> std::io::Result<Vec<ExitStatus>> {
50    // Take ownership of every child's stdout BEFORE the fan-out loop closes
51    // their stdin handles (otherwise the children would block on writes if
52    // they emit before consuming all input).
53    let mut children = children;
54    let stdouts: Vec<Option<std::process::ChildStdout>> =
55        children.iter_mut().map(|c| c.stdout.take()).collect();
56
57    // Spawn a thread per child to drain its stdout in parallel with the
58    // fan-out write loop. This prevents a child that emits a lot of stdout
59    // from blocking on its own pipe buffer while we're still feeding it
60    // stdin.
61    let drainer_handles: Vec<std::thread::JoinHandle<std::io::Result<Vec<u8>>>> = stdouts
62        .into_iter()
63        .map(|maybe_stdout| {
64            std::thread::spawn(move || {
65                let mut buf = Vec::new();
66                if let Some(mut h) = maybe_stdout {
67                    h.read_to_end(&mut buf)?;
68                }
69                Ok(buf)
70            })
71        })
72        .collect();
73
74    // Run the fan-out write loop (closes stdin + waits children).
75    let statuses = fanout::run(reader, children)?;
76
77    // Join all drainer threads to collect each child's full stdout.
78    let mut captured: Vec<Vec<u8>> = Vec::with_capacity(drainer_handles.len());
79    for handle in drainer_handles {
80        let bytes = handle
81            .join()
82            .map_err(|_| std::io::Error::other("capture drainer thread panicked"))??;
83        captured.push(bytes);
84    }
85
86    // FR-017 + Clarification Q6: emit in argv order. Each non-empty captured
87    // chunk is separated from the next by a single LF if it doesn't already
88    // end with one. Empty children emit nothing (no separator inserted).
89    for bytes in &captured {
90        if bytes.is_empty() {
91            continue;
92        }
93        out.write_all(bytes)?;
94        if !bytes.ends_with(b"\n") {
95            out.write_all(b"\n")?;
96        }
97    }
98    out.flush()?;
99
100    Ok(statuses)
101}