rusty_pee/lib.rs
1//! # rusty-pee
2//!
3//! A Rust port of the moreutils `pee` utility: fan a single stdin stream out
4//! to N concurrent shell-spawned children, aggregate their exit codes, and
5//! surface failures cleanly.
6//!
7//! ## Quick start
8//!
9//! ```no_run
10//! use rusty_pee::{PeeBuilder, CompatibilityMode};
11//! use std::io::Cursor;
12//!
13//! // Construct sinks that the builder owns (so they satisfy `'static`).
14//! let sink_a: Vec<u8> = Vec::new();
15//! let sink_b: Vec<u8> = Vec::new();
16//!
17//! let mut pee = PeeBuilder::new()
18//! .sink(Box::new(sink_a))
19//! .sink(Box::new(sink_b))
20//! .compat(CompatibilityMode::Default)
21//! .build()?;
22//!
23//! let input = Cursor::new(b"alpha\nbravo\ncharlie\n".to_vec());
24//! pee.run(input)?;
25//! # Ok::<(), rusty_pee::Error>(())
26//! ```
27//!
28//! ## Stability (lockstep SemVer)
29//!
30//! Library and binary share a single crate version. Within `0.x`, minor
31//! version bumps may introduce breaking changes per standard Cargo
32//! semantics. Every public enum and struct is `#[non_exhaustive]` so
33//! variant additions are not breaking changes once `1.0` lands.
34//!
35//! ## Pipeline-safety contract
36//!
37//! When a sink errors mid-chunk during [`Pee::run`], every other live sink
38//! receives the **complete current chunk** in registration order before the
39//! failing sink is dropped from the live-set (mirrors the CLI's
40//! `--ignore-write-errors` default — frozen-on per FR-003).
41
42pub mod error;
43
44pub use error::Error;
45
46/// Whether to apply Default-mode ergonomic extensions or Strict moreutils parity.
47///
48/// # Examples
49///
50/// ```
51/// use rusty_pee::CompatibilityMode;
52///
53/// assert_eq!(CompatibilityMode::default(), CompatibilityMode::Default);
54/// // Strict mode rejects `--capture`, `--help`, `--version`, and completions.
55/// let _ = CompatibilityMode::Strict;
56/// ```
57#[non_exhaustive]
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
59pub enum CompatibilityMode {
60 /// Default mode: `--help`, `--version`, `--capture`, `completions` subcommand all honored.
61 /// Exit aggregation uses `max(child_codes)`.
62 #[default]
63 Default,
64 /// Strict mode: byte-equal moreutils stderr for documented inputs;
65 /// exit aggregation uses bitwise OR (matches moreutils `close_pipes()`).
66 Strict,
67}
68
69/// Default fan-out chunk size (64 KiB per AD-015). Not user-configurable in v0.1.0.
70pub const BUFSIZ: usize = 64 * 1024;
71
72/// Runtime engine for one pee invocation. Constructed via [`PeeBuilder`].
73///
74/// # Examples
75///
76/// ```no_run
77/// # use rusty_pee::{PeeBuilder, CompatibilityMode};
78/// # use std::io::Cursor;
79/// let mut pee = PeeBuilder::new().build()?;
80/// let input = Cursor::new(b"hello".to_vec());
81/// pee.run(input)?;
82/// # Ok::<(), rusty_pee::Error>(())
83/// ```
84#[non_exhaustive]
85pub struct Pee {
86 sinks: Vec<Box<dyn std::io::Write + Send>>,
87 compat: CompatibilityMode,
88 capture: bool,
89 ignore_write_errors: bool,
90}
91
92impl std::fmt::Debug for Pee {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct("Pee")
95 .field("sinks_count", &self.sinks.len())
96 .field("compat", &self.compat)
97 .field("capture", &self.capture)
98 .field("ignore_write_errors", &self.ignore_write_errors)
99 .finish()
100 }
101}
102
103/// Builder for [`Pee`]. All chain methods are `#[must_use]`.
104///
105/// # Examples
106///
107/// ```
108/// use rusty_pee::{PeeBuilder, CompatibilityMode};
109///
110/// let pee = PeeBuilder::new()
111/// .compat(CompatibilityMode::Default)
112/// .build()
113/// .expect("default builder always succeeds");
114/// # let _ = pee;
115/// ```
116#[non_exhaustive]
117pub struct PeeBuilder {
118 sinks: Vec<Box<dyn std::io::Write + Send>>,
119 compat: CompatibilityMode,
120 capture: bool,
121 ignore_write_errors: bool,
122}
123
124impl std::fmt::Debug for PeeBuilder {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("PeeBuilder")
127 .field("sinks_count", &self.sinks.len())
128 .field("compat", &self.compat)
129 .field("capture", &self.capture)
130 .field("ignore_write_errors", &self.ignore_write_errors)
131 .finish()
132 }
133}
134
135impl Default for PeeBuilder {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl PeeBuilder {
142 /// Construct a new builder defaulting to zero sinks, `Default` compat,
143 /// `capture(false)`, and `ignore_write_errors(true)`.
144 #[must_use]
145 pub fn new() -> Self {
146 Self {
147 sinks: Vec::new(),
148 compat: CompatibilityMode::Default,
149 capture: false,
150 ignore_write_errors: true,
151 }
152 }
153
154 /// Register a sink. Sinks receive the same bytes in registration order.
155 #[must_use]
156 pub fn sink(mut self, sink: Box<dyn std::io::Write + Send>) -> Self {
157 self.sinks.push(sink);
158 self
159 }
160
161 /// Set the compatibility mode.
162 #[must_use]
163 pub fn compat(mut self, compat: CompatibilityMode) -> Self {
164 self.compat = compat;
165 self
166 }
167
168 /// Set capture-mode flag. Library no-op (sinks already capture by definition);
169 /// retained for parity with the CLI flag and validated against Strict mode at build time.
170 #[must_use]
171 pub fn capture(mut self, capture: bool) -> Self {
172 self.capture = capture;
173 self
174 }
175
176 /// Set the ignore-write-errors policy. Default: `true` (matches moreutils
177 /// `--ignore-write-errors` default-on behavior). When `false`, the first
178 /// sink write error halts the fan-out and is surfaced as
179 /// `Error::SinkWriteFailed`.
180 #[must_use]
181 pub fn ignore_write_errors(mut self, ignore: bool) -> Self {
182 self.ignore_write_errors = ignore;
183 self
184 }
185
186 /// Validate and build a [`Pee`].
187 pub fn build(self) -> Result<Pee, Error> {
188 // FR-022: Strict + capture(true) is rejected at build time.
189 if self.compat == CompatibilityMode::Strict && self.capture {
190 return Err(Error::CompatibilityViolation(
191 "--capture not honored in Strict mode",
192 ));
193 }
194 Ok(Pee {
195 sinks: self.sinks,
196 compat: self.compat,
197 capture: self.capture,
198 ignore_write_errors: self.ignore_write_errors,
199 })
200 }
201}
202
203impl Pee {
204 /// Read from `reader` and fan out each chunk to every registered sink in
205 /// registration order. On sink `BrokenPipe`, the sink is dropped from the
206 /// live-set and the fan-out continues to the remaining sinks (FR-003 + FR-036).
207 ///
208 /// **Writer-untouched invariant** (FR-022): every other live sink receives
209 /// the COMPLETE current chunk in registration order before the failing
210 /// sink is dropped.
211 ///
212 /// **Zero-sink case** (FR-006): drains the reader to completion and returns Ok.
213 pub fn run<R: std::io::Read>(&mut self, mut reader: R) -> Result<(), Error> {
214 let _ = (self.compat, self.capture); // hold while we implement
215
216 if self.sinks.is_empty() {
217 // Drain reader to nothing.
218 let mut buf = [0u8; BUFSIZ];
219 loop {
220 let n = reader.read(&mut buf)?;
221 if n == 0 {
222 break;
223 }
224 }
225 return Ok(());
226 }
227
228 let mut buf = vec![0u8; BUFSIZ];
229 // Live-set: indices into self.sinks that haven't errored yet.
230 let mut live: Vec<usize> = (0..self.sinks.len()).collect();
231
232 loop {
233 let n = reader.read(&mut buf)?;
234 if n == 0 {
235 break;
236 }
237 // FR-036: write the full chunk to each live sink in registration
238 // order before dropping any that error.
239 let mut to_drop: Vec<usize> = Vec::new();
240 for &idx in &live {
241 use std::io::Write;
242 match self.sinks[idx].write_all(&buf[..n]) {
243 Ok(()) => {}
244 Err(e)
245 if e.kind() == std::io::ErrorKind::BrokenPipe
246 && self.ignore_write_errors =>
247 {
248 to_drop.push(idx);
249 }
250 Err(e) if self.ignore_write_errors => {
251 // Non-BrokenPipe write errors with ignore-on: still drop the sink silently.
252 to_drop.push(idx);
253 let _ = e;
254 }
255 Err(e) => {
256 return Err(Error::SinkWriteFailed {
257 sink_index: idx,
258 source: e,
259 });
260 }
261 }
262 }
263 live.retain(|i| !to_drop.contains(i));
264 if live.is_empty() {
265 // All sinks closed; drain reader to nothing.
266 loop {
267 let n = reader.read(&mut buf)?;
268 if n == 0 {
269 break;
270 }
271 }
272 break;
273 }
274 }
275
276 // Flush surviving sinks.
277 for &idx in &live {
278 use std::io::Write;
279 let _ = self.sinks[idx].flush();
280 }
281 Ok(())
282 }
283}
284
285// CLI / mode / signal / strict / spawner / fanout / aggregate / capture
286// internals are gated behind `cli` because they pull clap, signal-hook, and
287// other binary-only deps. Library callers configure compat mode via the builder.
288#[cfg(feature = "cli")]
289pub mod aggregate;
290#[cfg(feature = "cli")]
291pub mod capture;
292#[cfg(feature = "cli")]
293pub mod cli;
294#[cfg(feature = "cli")]
295pub mod fanout;
296#[cfg(feature = "cli")]
297pub mod mode;
298#[cfg(feature = "cli")]
299pub mod spawner;
300#[cfg(feature = "cli")]
301pub mod strict;
302
303/// Binary entry-point helper used by both `src/main.rs` and `src/bin/pee.rs`.
304///
305/// Per FR-007/FR-008/AD-002/AD-003: Default-mode exit aggregation uses
306/// `max(child_codes)`; Strict mode uses bitwise OR.
307#[cfg(feature = "cli")]
308pub fn run() -> std::process::ExitCode {
309 use clap::Parser;
310 use std::ffi::OsString;
311 use std::process::ExitCode;
312
313 let raw_argv: Vec<OsString> = std::env::args_os().collect();
314
315 // Pre-clap detection of `--strict` / `--no-strict` + env + argv[0] for
316 // Strict-mode dispatch. Strict mode bypasses clap entirely.
317 let pre_strict = strict::pre_scan_strict_flag(&raw_argv);
318 let env_strict = std::env::var_os("RUSTY_PEE_STRICT");
319 let argv0 = raw_argv.first().cloned();
320 let resolved_mode = mode::resolve(pre_strict, env_strict.as_deref(), argv0.as_deref());
321
322 if resolved_mode == CompatibilityMode::Strict {
323 return strict::run(&raw_argv);
324 }
325
326 let cli_args = match cli::Cli::try_parse() {
327 Ok(args) => args,
328 Err(e) => {
329 e.print().ok();
330 return match e.kind() {
331 clap::error::ErrorKind::DisplayHelp | clap::error::ErrorKind::DisplayVersion => {
332 ExitCode::SUCCESS
333 }
334 _ => ExitCode::from(2),
335 };
336 }
337 };
338
339 // Completions subcommand (Default only).
340 if let Some(cli::Subcommand::Completions { shell }) = cli_args.command {
341 use clap::CommandFactory;
342 let mut cmd = cli::Cli::command();
343 let name = cmd.get_name().to_string();
344 clap_complete::generate(shell, &mut cmd, name, &mut std::io::stdout());
345 return ExitCode::SUCCESS;
346 }
347
348 // FR-006: zero commands → drain stdin and exit 0.
349 if cli_args.commands.is_empty() {
350 use std::io::Read;
351 let stdin = std::io::stdin();
352 let mut handle = stdin.lock();
353 let mut buf = [0u8; BUFSIZ];
354 loop {
355 match handle.read(&mut buf) {
356 Ok(0) => break,
357 Ok(_) => {}
358 Err(e) => {
359 eprintln!("rusty-pee: stdin read error: {e}");
360 return ExitCode::from(1);
361 }
362 }
363 }
364 return ExitCode::SUCCESS;
365 }
366
367 // Capture mode (FR-017): replace each child's stdout with `Stdio::piped()`,
368 // drain in parallel via worker threads, emit captured chunks in argv order.
369 let statuses = if cli_args.capture {
370 let children = match capture::spawn_all_piped(&cli_args.commands) {
371 Ok(c) => c,
372 Err(e) => {
373 eprintln!("rusty-pee: failed to spawn child: {e}");
374 return ExitCode::from(127);
375 }
376 };
377 let stdin = std::io::stdin();
378 let stdout = std::io::stdout();
379 let mut out = stdout.lock();
380 match capture::run_with_capture(stdin.lock(), children, &mut out) {
381 Ok(s) => s,
382 Err(e) => {
383 eprintln!("rusty-pee: capture error: {e}");
384 return ExitCode::from(1);
385 }
386 }
387 } else {
388 // Default fan-out (FR-002, FR-003, FR-004) — children inherit parent stdio.
389 let mut children = Vec::with_capacity(cli_args.commands.len());
390 for cmd in &cli_args.commands {
391 match spawner::spawn_one(cmd) {
392 Ok(c) => children.push(c),
393 Err(e) => {
394 eprintln!("rusty-pee: failed to spawn child '{cmd}': {e}");
395 for mut c in children.into_iter() {
396 let _ = c.kill();
397 let _ = c.wait();
398 }
399 return ExitCode::from(127);
400 }
401 }
402 }
403 let stdin = std::io::stdin();
404 match fanout::run(stdin.lock(), children) {
405 Ok(s) => s,
406 Err(e) => {
407 eprintln!("rusty-pee: fan-out error: {e}");
408 return ExitCode::from(1);
409 }
410 }
411 };
412
413 // Aggregate exit codes (FR-007). Default mode reaches here only; Strict
414 // mode was already dispatched to strict::run() above.
415 let codes: Vec<i32> = statuses.iter().map(|s| s.code().unwrap_or(1)).collect();
416 let aggregated = aggregate::default_max(&codes);
417
418 let byte = if (0..=255).contains(&aggregated) {
419 aggregated as u8
420 } else {
421 1u8
422 };
423 ExitCode::from(byte)
424}