Skip to main content

rusty_pee/
lib.rs

1//! # rusty-pee
2//!
3//! A Rust port of the moreutils `pee` utility: fan a single stdin stream out
4//! to N concurrent shell-spawned children, aggregate their exit codes, and
5//! surface failures cleanly.
6//!
7//! ## Quick start
8//!
9//! ```no_run
10//! use rusty_pee::{PeeBuilder, CompatibilityMode};
11//! use std::io::Cursor;
12//!
13//! // Construct sinks that the builder owns (so they satisfy `'static`).
14//! let sink_a: Vec<u8> = Vec::new();
15//! let sink_b: Vec<u8> = Vec::new();
16//!
17//! let mut pee = PeeBuilder::new()
18//!     .sink(Box::new(sink_a))
19//!     .sink(Box::new(sink_b))
20//!     .compat(CompatibilityMode::Default)
21//!     .build()?;
22//!
23//! let input = Cursor::new(b"alpha\nbravo\ncharlie\n".to_vec());
24//! pee.run(input)?;
25//! # Ok::<(), rusty_pee::Error>(())
26//! ```
27//!
28//! ## Stability (lockstep SemVer)
29//!
30//! Library and binary share a single crate version. Within `0.x`, minor
31//! version bumps may introduce breaking changes per standard Cargo
32//! semantics. Every public enum and struct is `#[non_exhaustive]` so
33//! variant additions are not breaking changes once `1.0` lands.
34//!
35//! ## Pipeline-safety contract
36//!
37//! When a sink errors mid-chunk during [`Pee::run`], every other live sink
38//! receives the **complete current chunk** in registration order before the
39//! failing sink is dropped from the live-set (mirrors the CLI's
40//! `--ignore-write-errors` default — frozen-on per FR-003).
41
42pub mod error;
43
44pub use error::Error;
45
46/// Whether to apply Default-mode ergonomic extensions or Strict moreutils parity.
47///
48/// # Examples
49///
50/// ```
51/// use rusty_pee::CompatibilityMode;
52///
53/// assert_eq!(CompatibilityMode::default(), CompatibilityMode::Default);
54/// // Strict mode rejects `--capture`, `--help`, `--version`, and completions.
55/// let _ = CompatibilityMode::Strict;
56/// ```
57#[non_exhaustive]
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
59pub enum CompatibilityMode {
60    /// Default mode: `--help`, `--version`, `--capture`, `completions` subcommand all honored.
61    /// Exit aggregation uses `max(child_codes)`.
62    #[default]
63    Default,
64    /// Strict mode: byte-equal moreutils stderr for documented inputs;
65    /// exit aggregation uses bitwise OR (matches moreutils `close_pipes()`).
66    Strict,
67}
68
69/// Default fan-out chunk size (64 KiB per AD-015). Not user-configurable in v0.1.0.
70pub const BUFSIZ: usize = 64 * 1024;
71
72/// Runtime engine for one pee invocation. Constructed via [`PeeBuilder`].
73///
74/// # Examples
75///
76/// ```no_run
77/// # use rusty_pee::{PeeBuilder, CompatibilityMode};
78/// # use std::io::Cursor;
79/// let mut pee = PeeBuilder::new().build()?;
80/// let input = Cursor::new(b"hello".to_vec());
81/// pee.run(input)?;
82/// # Ok::<(), rusty_pee::Error>(())
83/// ```
84#[non_exhaustive]
85pub struct Pee {
86    sinks: Vec<Box<dyn std::io::Write + Send>>,
87    compat: CompatibilityMode,
88    capture: bool,
89    ignore_write_errors: bool,
90}
91
92impl std::fmt::Debug for Pee {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("Pee")
95            .field("sinks_count", &self.sinks.len())
96            .field("compat", &self.compat)
97            .field("capture", &self.capture)
98            .field("ignore_write_errors", &self.ignore_write_errors)
99            .finish()
100    }
101}
102
103/// Builder for [`Pee`]. All chain methods are `#[must_use]`.
104///
105/// # Examples
106///
107/// ```
108/// use rusty_pee::{PeeBuilder, CompatibilityMode};
109///
110/// let pee = PeeBuilder::new()
111///     .compat(CompatibilityMode::Default)
112///     .build()
113///     .expect("default builder always succeeds");
114/// # let _ = pee;
115/// ```
116#[non_exhaustive]
117pub struct PeeBuilder {
118    sinks: Vec<Box<dyn std::io::Write + Send>>,
119    compat: CompatibilityMode,
120    capture: bool,
121    ignore_write_errors: bool,
122}
123
124impl std::fmt::Debug for PeeBuilder {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("PeeBuilder")
127            .field("sinks_count", &self.sinks.len())
128            .field("compat", &self.compat)
129            .field("capture", &self.capture)
130            .field("ignore_write_errors", &self.ignore_write_errors)
131            .finish()
132    }
133}
134
135impl Default for PeeBuilder {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl PeeBuilder {
142    /// Construct a new builder defaulting to zero sinks, `Default` compat,
143    /// `capture(false)`, and `ignore_write_errors(true)`.
144    #[must_use]
145    pub fn new() -> Self {
146        Self {
147            sinks: Vec::new(),
148            compat: CompatibilityMode::Default,
149            capture: false,
150            ignore_write_errors: true,
151        }
152    }
153
154    /// Register a sink. Sinks receive the same bytes in registration order.
155    #[must_use]
156    pub fn sink(mut self, sink: Box<dyn std::io::Write + Send>) -> Self {
157        self.sinks.push(sink);
158        self
159    }
160
161    /// Set the compatibility mode.
162    #[must_use]
163    pub fn compat(mut self, compat: CompatibilityMode) -> Self {
164        self.compat = compat;
165        self
166    }
167
168    /// Set capture-mode flag. Library no-op (sinks already capture by definition);
169    /// retained for parity with the CLI flag and validated against Strict mode at build time.
170    #[must_use]
171    pub fn capture(mut self, capture: bool) -> Self {
172        self.capture = capture;
173        self
174    }
175
176    /// Set the ignore-write-errors policy. Default: `true` (matches moreutils
177    /// `--ignore-write-errors` default-on behavior). When `false`, the first
178    /// sink write error halts the fan-out and is surfaced as
179    /// `Error::SinkWriteFailed`.
180    #[must_use]
181    pub fn ignore_write_errors(mut self, ignore: bool) -> Self {
182        self.ignore_write_errors = ignore;
183        self
184    }
185
186    /// Validate and build a [`Pee`].
187    pub fn build(self) -> Result<Pee, Error> {
188        // FR-022: Strict + capture(true) is rejected at build time.
189        if self.compat == CompatibilityMode::Strict && self.capture {
190            return Err(Error::CompatibilityViolation(
191                "--capture not honored in Strict mode",
192            ));
193        }
194        Ok(Pee {
195            sinks: self.sinks,
196            compat: self.compat,
197            capture: self.capture,
198            ignore_write_errors: self.ignore_write_errors,
199        })
200    }
201}
202
203impl Pee {
204    /// Read from `reader` and fan out each chunk to every registered sink in
205    /// registration order. On sink `BrokenPipe`, the sink is dropped from the
206    /// live-set and the fan-out continues to the remaining sinks (FR-003 + FR-036).
207    ///
208    /// **Writer-untouched invariant** (FR-022): every other live sink receives
209    /// the COMPLETE current chunk in registration order before the failing
210    /// sink is dropped.
211    ///
212    /// **Zero-sink case** (FR-006): drains the reader to completion and returns Ok.
213    pub fn run<R: std::io::Read>(&mut self, mut reader: R) -> Result<(), Error> {
214        let _ = (self.compat, self.capture); // hold while we implement
215
216        if self.sinks.is_empty() {
217            // Drain reader to nothing.
218            let mut buf = [0u8; BUFSIZ];
219            loop {
220                let n = reader.read(&mut buf)?;
221                if n == 0 {
222                    break;
223                }
224            }
225            return Ok(());
226        }
227
228        let mut buf = vec![0u8; BUFSIZ];
229        // Live-set: indices into self.sinks that haven't errored yet.
230        let mut live: Vec<usize> = (0..self.sinks.len()).collect();
231
232        loop {
233            let n = reader.read(&mut buf)?;
234            if n == 0 {
235                break;
236            }
237            // FR-036: write the full chunk to each live sink in registration
238            // order before dropping any that error.
239            let mut to_drop: Vec<usize> = Vec::new();
240            for &idx in &live {
241                use std::io::Write;
242                match self.sinks[idx].write_all(&buf[..n]) {
243                    Ok(()) => {}
244                    Err(e)
245                        if e.kind() == std::io::ErrorKind::BrokenPipe
246                            && self.ignore_write_errors =>
247                    {
248                        to_drop.push(idx);
249                    }
250                    Err(e) if self.ignore_write_errors => {
251                        // Non-BrokenPipe write errors with ignore-on: still drop the sink silently.
252                        to_drop.push(idx);
253                        let _ = e;
254                    }
255                    Err(e) => {
256                        return Err(Error::SinkWriteFailed {
257                            sink_index: idx,
258                            source: e,
259                        });
260                    }
261                }
262            }
263            live.retain(|i| !to_drop.contains(i));
264            if live.is_empty() {
265                // All sinks closed; drain reader to nothing.
266                loop {
267                    let n = reader.read(&mut buf)?;
268                    if n == 0 {
269                        break;
270                    }
271                }
272                break;
273            }
274        }
275
276        // Flush surviving sinks.
277        for &idx in &live {
278            use std::io::Write;
279            let _ = self.sinks[idx].flush();
280        }
281        Ok(())
282    }
283}
284
285// CLI / mode / signal / strict / spawner / fanout / aggregate / capture
286// internals are gated behind `cli` because they pull clap, signal-hook, and
287// other binary-only deps. Library callers configure compat mode via the builder.
288#[cfg(feature = "cli")]
289pub mod aggregate;
290#[cfg(feature = "cli")]
291pub mod capture;
292#[cfg(feature = "cli")]
293pub mod cli;
294#[cfg(feature = "cli")]
295pub mod fanout;
296#[cfg(feature = "cli")]
297pub mod mode;
298#[cfg(feature = "cli")]
299pub mod spawner;
300#[cfg(feature = "cli")]
301pub mod strict;
302
303/// Binary entry-point helper used by both `src/main.rs` and `src/bin/pee.rs`.
304///
305/// Per FR-007/FR-008/AD-002/AD-003: Default-mode exit aggregation uses
306/// `max(child_codes)`; Strict mode uses bitwise OR.
307#[cfg(feature = "cli")]
308pub fn run() -> std::process::ExitCode {
309    use clap::Parser;
310    use std::ffi::OsString;
311    use std::process::ExitCode;
312
313    let raw_argv: Vec<OsString> = std::env::args_os().collect();
314
315    // Pre-clap detection of `--strict` / `--no-strict` + env + argv[0] for
316    // Strict-mode dispatch. Strict mode bypasses clap entirely.
317    let pre_strict = strict::pre_scan_strict_flag(&raw_argv);
318    let env_strict = std::env::var_os("RUSTY_PEE_STRICT");
319    let argv0 = raw_argv.first().cloned();
320    let resolved_mode = mode::resolve(pre_strict, env_strict.as_deref(), argv0.as_deref());
321
322    if resolved_mode == CompatibilityMode::Strict {
323        return strict::run(&raw_argv);
324    }
325
326    let cli_args = match cli::Cli::try_parse() {
327        Ok(args) => args,
328        Err(e) => {
329            e.print().ok();
330            return match e.kind() {
331                clap::error::ErrorKind::DisplayHelp | clap::error::ErrorKind::DisplayVersion => {
332                    ExitCode::SUCCESS
333                }
334                _ => ExitCode::from(2),
335            };
336        }
337    };
338
339    // Completions subcommand (Default only).
340    if let Some(cli::Subcommand::Completions { shell }) = cli_args.command {
341        use clap::CommandFactory;
342        let mut cmd = cli::Cli::command();
343        let name = cmd.get_name().to_string();
344        clap_complete::generate(shell, &mut cmd, name, &mut std::io::stdout());
345        return ExitCode::SUCCESS;
346    }
347
348    // FR-006: zero commands → drain stdin and exit 0.
349    if cli_args.commands.is_empty() {
350        use std::io::Read;
351        let stdin = std::io::stdin();
352        let mut handle = stdin.lock();
353        let mut buf = [0u8; BUFSIZ];
354        loop {
355            match handle.read(&mut buf) {
356                Ok(0) => break,
357                Ok(_) => {}
358                Err(e) => {
359                    eprintln!("rusty-pee: stdin read error: {e}");
360                    return ExitCode::from(1);
361                }
362            }
363        }
364        return ExitCode::SUCCESS;
365    }
366
367    // Capture mode (FR-017): replace each child's stdout with `Stdio::piped()`,
368    // drain in parallel via worker threads, emit captured chunks in argv order.
369    let statuses = if cli_args.capture {
370        let children = match capture::spawn_all_piped(&cli_args.commands) {
371            Ok(c) => c,
372            Err(e) => {
373                eprintln!("rusty-pee: failed to spawn child: {e}");
374                return ExitCode::from(127);
375            }
376        };
377        let stdin = std::io::stdin();
378        let stdout = std::io::stdout();
379        let mut out = stdout.lock();
380        match capture::run_with_capture(stdin.lock(), children, &mut out) {
381            Ok(s) => s,
382            Err(e) => {
383                eprintln!("rusty-pee: capture error: {e}");
384                return ExitCode::from(1);
385            }
386        }
387    } else {
388        // Default fan-out (FR-002, FR-003, FR-004) — children inherit parent stdio.
389        let mut children = Vec::with_capacity(cli_args.commands.len());
390        for cmd in &cli_args.commands {
391            match spawner::spawn_one(cmd) {
392                Ok(c) => children.push(c),
393                Err(e) => {
394                    eprintln!("rusty-pee: failed to spawn child '{cmd}': {e}");
395                    for mut c in children.into_iter() {
396                        let _ = c.kill();
397                        let _ = c.wait();
398                    }
399                    return ExitCode::from(127);
400                }
401            }
402        }
403        let stdin = std::io::stdin();
404        match fanout::run(stdin.lock(), children) {
405            Ok(s) => s,
406            Err(e) => {
407                eprintln!("rusty-pee: fan-out error: {e}");
408                return ExitCode::from(1);
409            }
410        }
411    };
412
413    // Aggregate exit codes (FR-007). Default mode reaches here only; Strict
414    // mode was already dispatched to strict::run() above.
415    let codes: Vec<i32> = statuses.iter().map(|s| s.code().unwrap_or(1)).collect();
416    let aggregated = aggregate::default_max(&codes);
417
418    let byte = if (0..=255).contains(&aggregated) {
419        aggregated as u8
420    } else {
421        1u8
422    };
423    ExitCode::from(byte)
424}