rusty_pee/capture.rs
1//! `--capture` mode: replace each child's stdout with a piped handle, buffer
2//! to completion, emit captured chunks in argv order after all children exit
3//! (FR-017 + AD-007).
4//!
5//! Without `--capture` (the default), children inherit the parent's stdout
6//! and their outputs interleave nondeterministically. With `--capture`, the
7//! parent guarantees argv-ordered emission — useful for tests and scripts
8//! that need reproducible output (US5).
9//!
10//! Clarification Q2 + Q6 from spec.md:
11//! - Q2: at spawn time, each child's stdout is replaced with `Stdio::piped()`.
12//! - Q6: empty-stdout children emit nothing (no separator inserted).
13
14use std::io::Read;
15use std::process::{Child, ExitStatus};
16
17use crate::fanout;
18use crate::spawner;
19
20/// Spawn all commands with piped stdout (for capture mode). Returns the
21/// spawned children in argv order. On any spawn failure, kills already-spawned
22/// children and returns the io error.
23pub fn spawn_all_piped(cmds: &[String]) -> std::io::Result<Vec<Child>> {
24 let mut children = Vec::with_capacity(cmds.len());
25 for cmd in cmds {
26 match spawner::spawn_one_piped_stdout(cmd) {
27 Ok(c) => children.push(c),
28 Err(e) => {
29 for mut prior in children.into_iter() {
30 let _ = prior.kill();
31 let _ = prior.wait();
32 }
33 return Err(e);
34 }
35 }
36 }
37 Ok(children)
38}
39
40/// Drive the fan-out + capture: feed reader to every child's stdin, then read
41/// every child's stdout into a per-argv-position buffer, then wait + collect
42/// statuses, then emit captured bytes in argv order to `out`.
43///
44/// Returns the per-child exit statuses for downstream aggregation.
45pub fn run_with_capture<R: Read, W: std::io::Write>(
46 reader: R,
47 children: Vec<Child>,
48 out: &mut W,
49) -> std::io::Result<Vec<ExitStatus>> {
50 // Take ownership of every child's stdout BEFORE the fan-out loop closes
51 // their stdin handles (otherwise the children would block on writes if
52 // they emit before consuming all input).
53 let mut children = children;
54 let stdouts: Vec<Option<std::process::ChildStdout>> =
55 children.iter_mut().map(|c| c.stdout.take()).collect();
56
57 // Spawn a thread per child to drain its stdout in parallel with the
58 // fan-out write loop. This prevents a child that emits a lot of stdout
59 // from blocking on its own pipe buffer while we're still feeding it
60 // stdin.
61 let drainer_handles: Vec<std::thread::JoinHandle<std::io::Result<Vec<u8>>>> = stdouts
62 .into_iter()
63 .map(|maybe_stdout| {
64 std::thread::spawn(move || {
65 let mut buf = Vec::new();
66 if let Some(mut h) = maybe_stdout {
67 h.read_to_end(&mut buf)?;
68 }
69 Ok(buf)
70 })
71 })
72 .collect();
73
74 // Run the fan-out write loop (closes stdin + waits children).
75 let statuses = fanout::run(reader, children)?;
76
77 // Join all drainer threads to collect each child's full stdout.
78 let mut captured: Vec<Vec<u8>> = Vec::with_capacity(drainer_handles.len());
79 for handle in drainer_handles {
80 let bytes = handle
81 .join()
82 .map_err(|_| std::io::Error::other("capture drainer thread panicked"))??;
83 captured.push(bytes);
84 }
85
86 // FR-017 + Clarification Q6: emit in argv order. Each non-empty captured
87 // chunk is separated from the next by a single LF if it doesn't already
88 // end with one. Empty children emit nothing (no separator inserted).
89 for bytes in &captured {
90 if bytes.is_empty() {
91 continue;
92 }
93 out.write_all(bytes)?;
94 if !bytes.ends_with(b"\n") {
95 out.write_all(b"\n")?;
96 }
97 }
98 out.flush()?;
99
100 Ok(statuses)
101}