Skip to main content

qubit_command/
command_runner.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    fs::File,
11    io::{
12        self,
13        Read,
14        Write,
15    },
16    path::{
17        Path,
18        PathBuf,
19    },
20    process::{
21        Command as ProcessCommand,
22        ExitStatus,
23        Stdio,
24    },
25    thread,
26    time::Duration,
27};
28
29#[cfg(windows)]
30use process_wrap::std::JobObject;
31#[cfg(unix)]
32use process_wrap::std::ProcessGroup;
33use process_wrap::std::{
34    ChildWrapper,
35    CommandWrap,
36};
37
38pub(crate) mod captured_output;
39pub(crate) mod command_io;
40pub(crate) mod finished_command;
41pub(crate) mod managed_child_process;
42pub(crate) mod output_capture_error;
43pub(crate) mod output_capture_options;
44pub(crate) mod output_reader;
45pub(crate) mod output_tee;
46pub(crate) mod running_command;
47pub(crate) mod stdin_writer;
48
49use captured_output::CapturedOutput;
50use command_io::CommandIo;
51use finished_command::FinishedCommand;
52use managed_child_process::ManagedChildProcess;
53use output_capture_error::OutputCaptureError;
54use output_capture_options::OutputCaptureOptions;
55use output_reader::OutputReader;
56use running_command::RunningCommand;
57use stdin_writer::StdinWriter;
58
59use crate::command_stdin::CommandStdin;
60use crate::{
61    Command,
62    CommandError,
63    CommandOutput,
64    OutputStream,
65};
66
67/// Predefined ten-second timeout value.
68///
69/// `CommandRunner::new` does not apply this timeout automatically. Use this
70/// constant with [`CommandRunner::timeout`] when callers want a short, explicit
71/// command limit.
72pub const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(10);
73
74/// Polling interval used while waiting for a child process with timeout.
75pub(crate) const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
76
77/// Runs external commands and captures their output.
78///
79/// `CommandRunner` runs one [`Command`] synchronously on the caller thread and
80/// returns captured process output. The runner always preserves raw output
81/// bytes. Its lossy-output option controls whether [`CommandOutput::stdout`]
82/// and [`CommandOutput::stderr`] reject invalid UTF-8 or return replacement
83/// characters.
84///
85/// # Author
86///
87/// Haixing Hu
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct CommandRunner {
90    /// Maximum duration allowed for each command.
91    timeout: Option<Duration>,
92    /// Default working directory used when a command does not override it.
93    working_directory: Option<PathBuf>,
94    /// Exit codes treated as successful.
95    success_exit_codes: Vec<i32>,
96    /// Whether command execution logs are disabled.
97    disable_logging: bool,
98    /// Whether captured text accessors should replace invalid UTF-8 bytes.
99    lossy_output: bool,
100    /// Maximum stdout bytes retained in memory.
101    max_stdout_bytes: Option<usize>,
102    /// Maximum stderr bytes retained in memory.
103    max_stderr_bytes: Option<usize>,
104    /// File that receives a streaming copy of stdout.
105    stdout_file: Option<PathBuf>,
106    /// File that receives a streaming copy of stderr.
107    stderr_file: Option<PathBuf>,
108}
109
110impl Default for CommandRunner {
111    /// Creates a command runner with the default exit-code policy.
112    ///
113    /// # Returns
114    ///
115    /// A runner with no timeout, inherited working directory, success exit code
116    /// `0`, strict UTF-8 output text accessors, unlimited in-memory output
117    /// capture, and no output tee files.
118    #[inline]
119    fn default() -> Self {
120        Self {
121            timeout: None,
122            working_directory: None,
123            success_exit_codes: vec![0],
124            disable_logging: false,
125            lossy_output: false,
126            max_stdout_bytes: None,
127            max_stderr_bytes: None,
128            stdout_file: None,
129            stderr_file: None,
130        }
131    }
132}
133
134impl CommandRunner {
135    /// Creates a command runner with default settings.
136    ///
137    /// # Returns
138    ///
139    /// A runner with no timeout, inherited working directory, success exit code
140    /// `0`, strict UTF-8 output text accessors, unlimited in-memory output
141    /// capture, and no output tee files.
142    #[inline]
143    pub fn new() -> Self {
144        Self::default()
145    }
146
147    /// Sets the command timeout.
148    ///
149    /// # Parameters
150    ///
151    /// * `timeout` - Maximum duration allowed for each command.
152    ///
153    /// # Returns
154    ///
155    /// The updated command runner.
156    #[inline]
157    pub const fn timeout(mut self, timeout: Duration) -> Self {
158        self.timeout = Some(timeout);
159        self
160    }
161
162    /// Disables timeout handling.
163    ///
164    /// # Returns
165    ///
166    /// The updated command runner.
167    #[inline]
168    pub const fn without_timeout(mut self) -> Self {
169        self.timeout = None;
170        self
171    }
172
173    /// Sets the default working directory.
174    ///
175    /// # Parameters
176    ///
177    /// * `working_directory` - Directory used when a command has no
178    ///   per-command working directory override.
179    ///
180    /// # Returns
181    ///
182    /// The updated command runner.
183    #[inline]
184    pub fn working_directory<P>(mut self, working_directory: P) -> Self
185    where
186        P: Into<PathBuf>,
187    {
188        self.working_directory = Some(working_directory.into());
189        self
190    }
191
192    /// Sets the only exit code treated as successful.
193    ///
194    /// # Parameters
195    ///
196    /// * `exit_code` - Exit code considered successful.
197    ///
198    /// # Returns
199    ///
200    /// The updated command runner.
201    #[inline]
202    pub fn success_exit_code(mut self, exit_code: i32) -> Self {
203        self.success_exit_codes = vec![exit_code];
204        self
205    }
206
207    /// Sets all exit codes treated as successful.
208    ///
209    /// # Parameters
210    ///
211    /// * `exit_codes` - Exit codes considered successful.
212    ///
213    /// # Returns
214    ///
215    /// The updated command runner.
216    #[inline]
217    pub fn success_exit_codes(mut self, exit_codes: &[i32]) -> Self {
218        self.success_exit_codes = exit_codes.to_vec();
219        self
220    }
221
222    /// Enables or disables command execution logs.
223    ///
224    /// # Parameters
225    ///
226    /// * `disable_logging` - `true` to suppress runner logs.
227    ///
228    /// # Returns
229    ///
230    /// The updated command runner.
231    #[inline]
232    pub const fn disable_logging(mut self, disable_logging: bool) -> Self {
233        self.disable_logging = disable_logging;
234        self
235    }
236
237    /// Configures whether output text accessors use lossy UTF-8 conversion.
238    ///
239    /// # Parameters
240    ///
241    /// * `lossy_output` - `true` to replace invalid UTF-8 bytes with the
242    ///   Unicode replacement character when [`CommandOutput::stdout`] or
243    ///   [`CommandOutput::stderr`] is called.
244    ///
245    /// # Returns
246    ///
247    /// The updated command runner.
248    #[inline]
249    pub const fn lossy_output(mut self, lossy_output: bool) -> Self {
250        self.lossy_output = lossy_output;
251        self
252    }
253
254    /// Sets the maximum stdout bytes retained in memory.
255    ///
256    /// The reader still drains the complete stdout stream. Bytes beyond this
257    /// limit are not retained in [`CommandOutput`], but they are still written to
258    /// a configured stdout tee file.
259    ///
260    /// # Parameters
261    ///
262    /// * `max_bytes` - Maximum number of stdout bytes to retain.
263    ///
264    /// # Returns
265    ///
266    /// The updated command runner.
267    #[inline]
268    pub const fn max_stdout_bytes(mut self, max_bytes: usize) -> Self {
269        self.max_stdout_bytes = Some(max_bytes);
270        self
271    }
272
273    /// Sets the maximum stderr bytes retained in memory.
274    ///
275    /// The reader still drains the complete stderr stream. Bytes beyond this
276    /// limit are not retained in [`CommandOutput`], but they are still written to
277    /// a configured stderr tee file.
278    ///
279    /// # Parameters
280    ///
281    /// * `max_bytes` - Maximum number of stderr bytes to retain.
282    ///
283    /// # Returns
284    ///
285    /// The updated command runner.
286    #[inline]
287    pub const fn max_stderr_bytes(mut self, max_bytes: usize) -> Self {
288        self.max_stderr_bytes = Some(max_bytes);
289        self
290    }
291
292    /// Sets the same in-memory capture limit for stdout and stderr.
293    ///
294    /// # Parameters
295    ///
296    /// * `max_bytes` - Maximum number of bytes retained for each stream.
297    ///
298    /// # Returns
299    ///
300    /// The updated command runner.
301    #[inline]
302    pub const fn max_output_bytes(mut self, max_bytes: usize) -> Self {
303        self.max_stdout_bytes = Some(max_bytes);
304        self.max_stderr_bytes = Some(max_bytes);
305        self
306    }
307
308    /// Streams stdout to a file while still capturing it in memory.
309    ///
310    /// The file is created or truncated before the command is spawned. Combine
311    /// this with [`Self::max_stdout_bytes`] to avoid unbounded memory use for
312    /// large stdout streams.
313    ///
314    /// # Parameters
315    ///
316    /// * `path` - Destination file path for stdout bytes.
317    ///
318    /// # Returns
319    ///
320    /// The updated command runner.
321    #[inline]
322    pub fn tee_stdout_to_file<P>(mut self, path: P) -> Self
323    where
324        P: Into<PathBuf>,
325    {
326        self.stdout_file = Some(path.into());
327        self
328    }
329
330    /// Streams stderr to a file while still capturing it in memory.
331    ///
332    /// The file is created or truncated before the command is spawned. Combine
333    /// this with [`Self::max_stderr_bytes`] to avoid unbounded memory use for
334    /// large stderr streams.
335    ///
336    /// # Parameters
337    ///
338    /// * `path` - Destination file path for stderr bytes.
339    ///
340    /// # Returns
341    ///
342    /// The updated command runner.
343    #[inline]
344    pub fn tee_stderr_to_file<P>(mut self, path: P) -> Self
345    where
346        P: Into<PathBuf>,
347    {
348        self.stderr_file = Some(path.into());
349        self
350    }
351
352    /// Returns the configured timeout.
353    ///
354    /// # Returns
355    ///
356    /// `Some(duration)` when timeout handling is enabled, otherwise `None`.
357    #[inline]
358    pub const fn configured_timeout(&self) -> Option<Duration> {
359        self.timeout
360    }
361
362    /// Returns the default working directory.
363    ///
364    /// # Returns
365    ///
366    /// `Some(path)` when a default working directory is configured, otherwise
367    /// `None` to inherit the current process working directory.
368    #[inline]
369    pub fn configured_working_directory(&self) -> Option<&Path> {
370        self.working_directory.as_deref()
371    }
372
373    /// Returns the configured successful exit codes.
374    ///
375    /// # Returns
376    ///
377    /// Borrowed list of exit codes treated as successful.
378    #[inline]
379    pub fn configured_success_exit_codes(&self) -> &[i32] {
380        &self.success_exit_codes
381    }
382
383    /// Returns whether logging is disabled.
384    ///
385    /// # Returns
386    ///
387    /// `true` when runner logs are disabled.
388    #[inline]
389    pub const fn is_logging_disabled(&self) -> bool {
390        self.disable_logging
391    }
392
393    /// Returns whether output text accessors use lossy UTF-8 conversion.
394    ///
395    /// # Returns
396    ///
397    /// `true` when invalid UTF-8 bytes are replaced before output is returned
398    /// by [`CommandOutput::stdout`] or [`CommandOutput::stderr`].
399    #[inline]
400    pub const fn is_lossy_output_enabled(&self) -> bool {
401        self.lossy_output
402    }
403
404    /// Returns the configured stdout capture limit.
405    ///
406    /// # Returns
407    ///
408    /// `Some(max_bytes)` when stdout capture is limited, otherwise `None`.
409    #[inline]
410    pub const fn configured_max_stdout_bytes(&self) -> Option<usize> {
411        self.max_stdout_bytes
412    }
413
414    /// Returns the configured stderr capture limit.
415    ///
416    /// # Returns
417    ///
418    /// `Some(max_bytes)` when stderr capture is limited, otherwise `None`.
419    #[inline]
420    pub const fn configured_max_stderr_bytes(&self) -> Option<usize> {
421        self.max_stderr_bytes
422    }
423
424    /// Returns the stdout tee file path.
425    ///
426    /// # Returns
427    ///
428    /// `Some(path)` when stdout is streamed to a file, otherwise `None`.
429    #[inline]
430    pub fn configured_stdout_file(&self) -> Option<&Path> {
431        self.stdout_file.as_deref()
432    }
433
434    /// Returns the stderr tee file path.
435    ///
436    /// # Returns
437    ///
438    /// `Some(path)` when stderr is streamed to a file, otherwise `None`.
439    #[inline]
440    pub fn configured_stderr_file(&self) -> Option<&Path> {
441        self.stderr_file.as_deref()
442    }
443
444    /// Runs a command and captures stdout and stderr.
445    ///
446    /// This method blocks the caller thread until the child process exits or
447    /// the configured timeout is reached. When a timeout is configured, Unix
448    /// children run as leaders of new process groups and Windows children run
449    /// in Job Objects. This lets timeout killing target the process tree
450    /// instead of only the direct child process. Without a configured timeout,
451    /// commands use the platform's normal process-spawning behavior.
452    ///
453    /// Captured output is retained as raw bytes up to the configured per-stream
454    /// limits. Reader threads still drain complete streams so the child is not
455    /// blocked on full pipes. If lossy output mode is enabled, invalid UTF-8 is
456    /// replaced only for [`CommandOutput::stdout`] and
457    /// [`CommandOutput::stderr`]; byte accessors still return the retained raw
458    /// process output.
459    ///
460    /// # Parameters
461    ///
462    /// * `command` - Structured command to run.
463    ///
464    /// # Returns
465    ///
466    /// Captured output when the process exits with a configured success code.
467    ///
468    /// # Errors
469    ///
470    /// Returns [`CommandError`] if the process cannot be spawned, cannot be
471    /// waited on, times out, cannot be killed after timing out, emits output
472    /// that cannot be read or written to a tee file, cannot receive configured
473    /// stdin, or exits with a code not configured as successful.
474    pub fn run(&self, command: Command) -> Result<CommandOutput, CommandError> {
475        let command_text = command.display_command();
476        if !self.disable_logging {
477            log::info!("Running command: {command_text}");
478        }
479
480        let mut process_command = ProcessCommand::new(command.program());
481        process_command.args(command.arguments());
482        process_command.stdout(Stdio::piped());
483        process_command.stderr(Stdio::piped());
484
485        if let Some(working_directory) = command
486            .working_directory_override()
487            .or(self.working_directory.as_deref())
488        {
489            process_command.current_dir(working_directory);
490        }
491
492        configure_environment(&command, &mut process_command);
493        let stdin_configuration = command.into_stdin_configuration();
494        let stdin_bytes =
495            configure_stdin(&command_text, stdin_configuration, &mut process_command)?;
496
497        let stdout_file = open_output_file(
498            &command_text,
499            OutputStream::Stdout,
500            self.stdout_file.as_deref(),
501        )?;
502        let stderr_file = open_output_file(
503            &command_text,
504            OutputStream::Stderr,
505            self.stderr_file.as_deref(),
506        )?;
507
508        let mut child_process = match spawn_child(process_command, self.timeout.is_some()) {
509            Ok(child_process) => child_process,
510            Err(source) => return Err(spawn_failed(&command_text, source)),
511        };
512
513        let stdin_writer = write_stdin_bytes(&command_text, child_process.as_mut(), stdin_bytes)?;
514
515        let stdout = match child_process.stdout().take() {
516            Some(stdout) => stdout,
517            None => return Err(output_pipe_error(&command_text, OutputStream::Stdout)),
518        };
519        let stderr = match child_process.stderr().take() {
520            Some(stderr) => stderr,
521            None => return Err(output_pipe_error(&command_text, OutputStream::Stderr)),
522        };
523        let stdout_reader = read_output_stream(
524            Box::new(stdout),
525            OutputCaptureOptions::new(self.max_stdout_bytes, stdout_file, self.stdout_file.clone()),
526        );
527        let stderr_reader = read_output_stream(
528            Box::new(stderr),
529            OutputCaptureOptions::new(self.max_stderr_bytes, stderr_file, self.stderr_file.clone()),
530        );
531        let command_io = CommandIo::new(stdout_reader, stderr_reader, stdin_writer);
532        let finished =
533            RunningCommand::new(command_text, child_process, command_io, self.lossy_output)
534                .wait_for_completion(self.timeout)?;
535        let FinishedCommand {
536            command_text,
537            output,
538        } = finished;
539
540        if output
541            .exit_code()
542            .is_some_and(|exit_code| self.success_exit_codes.contains(&exit_code))
543        {
544            if !self.disable_logging {
545                log::info!(
546                    "Finished command `{}` in {:?}.",
547                    command_text,
548                    output.elapsed()
549                );
550            }
551            Ok(output)
552        } else {
553            if !self.disable_logging {
554                log::error!(
555                    "Command `{}` exited with code {:?}.",
556                    command_text,
557                    output.exit_code()
558                );
559            }
560            Err(CommandError::UnexpectedExit {
561                command: command_text,
562                exit_code: output.exit_code(),
563                expected: self.success_exit_codes.clone(),
564                output: Box::new(output),
565            })
566        }
567    }
568}
569
570/// Configures stdin for a process command.
571///
572/// # Parameters
573///
574/// * `command_text` - Human-readable command text for diagnostics.
575/// * `stdin` - Owned stdin configuration for the command.
576/// * `process_command` - Process command being prepared.
577///
578/// # Returns
579///
580/// `Some(bytes)` when stdin bytes must be written after spawning, otherwise
581/// `None`.
582///
583/// # Errors
584///
585/// Returns [`CommandError::OpenInputFailed`] when a configured stdin file cannot
586/// be opened.
587fn configure_stdin(
588    command_text: &str,
589    stdin: CommandStdin,
590    process_command: &mut ProcessCommand,
591) -> Result<Option<Vec<u8>>, CommandError> {
592    match stdin {
593        CommandStdin::Null => {
594            process_command.stdin(Stdio::null());
595            Ok(None)
596        }
597        CommandStdin::Inherit => {
598            process_command.stdin(Stdio::inherit());
599            Ok(None)
600        }
601        CommandStdin::Bytes(bytes) => {
602            process_command.stdin(Stdio::piped());
603            Ok(Some(bytes))
604        }
605        CommandStdin::File(path) => match File::open(&path) {
606            Ok(file) => {
607                process_command.stdin(Stdio::from(file));
608                Ok(None)
609            }
610            Err(source) => Err(CommandError::OpenInputFailed {
611                command: command_text.to_owned(),
612                path,
613                source,
614            }),
615        },
616    }
617}
618
619/// Configures environment variables for a process command.
620///
621/// # Parameters
622///
623/// * `command` - Command environment configuration.
624/// * `process_command` - Process command being prepared.
625fn configure_environment(command: &Command, process_command: &mut ProcessCommand) {
626    if command.clears_environment() {
627        process_command.env_clear();
628    }
629    for key in command.removed_environment() {
630        process_command.env_remove(key);
631    }
632    for (key, value) in command.environment() {
633        process_command.env(key, value);
634    }
635}
636
637/// Spawns a child process with platform process-tree support.
638///
639/// # Parameters
640///
641/// * `process_command` - Prepared standard-library process command.
642/// * `kill_process_tree` - Whether timeout handling needs process-tree
643///   termination support.
644///
645/// # Returns
646///
647/// A child process. When `kill_process_tree` is `true`, Unix children are
648/// placed in a new process group and Windows children are placed in a Job
649/// Object.
650///
651/// # Errors
652///
653/// Returns the I/O error reported by process spawning or wrapper setup.
654fn spawn_child(
655    process_command: ProcessCommand,
656    kill_process_tree: bool,
657) -> io::Result<ManagedChildProcess> {
658    #[cfg(coverage)]
659    if crate::coverage_support::fake_children_enabled()
660        && let Some(child) = crate::coverage_support::fake_child_for(process_command.get_program())
661    {
662        return Ok(child);
663    }
664
665    let mut command = CommandWrap::from(process_command);
666    #[cfg(unix)]
667    if kill_process_tree {
668        command.wrap(ProcessGroup::leader());
669    }
670    #[cfg(windows)]
671    if kill_process_tree {
672        command.wrap(JobObject);
673    }
674    command.spawn()
675}
676
677/// Opens an output tee file before spawning the child.
678///
679/// # Parameters
680///
681/// * `command` - Human-readable command text for diagnostics.
682/// * `stream` - Stream associated with the file.
683/// * `path` - Optional file path.
684///
685/// # Returns
686///
687/// Open file handle when a path is configured, otherwise `None`.
688///
689/// # Errors
690///
691/// Returns [`CommandError::OpenOutputFailed`] when the path cannot be opened.
692fn open_output_file(
693    command: &str,
694    stream: OutputStream,
695    path: Option<&Path>,
696) -> Result<Option<File>, CommandError> {
697    match path {
698        Some(path) => {
699            File::create(path)
700                .map(Some)
701                .map_err(|source| CommandError::OpenOutputFailed {
702                    command: command.to_owned(),
703                    stream,
704                    path: path.to_path_buf(),
705                    source,
706                })
707        }
708        None => Ok(None),
709    }
710}
711
712/// Starts a helper thread that writes configured stdin bytes.
713///
714/// # Parameters
715///
716/// * `command` - Human-readable command text for diagnostics.
717/// * `child` - Spawned child process wrapper.
718/// * `stdin_bytes` - Optional bytes to write to stdin.
719///
720/// # Returns
721///
722/// Join handle for the stdin writer, if stdin bytes were configured.
723///
724/// # Errors
725///
726/// Returns [`CommandError::WriteInputFailed`] if stdin bytes were configured but
727/// the child stdin pipe was not available.
728pub(crate) fn write_stdin_bytes(
729    command: &str,
730    child: &mut dyn ChildWrapper,
731    stdin_bytes: Option<Vec<u8>>,
732) -> Result<StdinWriter, CommandError> {
733    match stdin_bytes {
734        Some(bytes) => match child.stdin().take() {
735            Some(mut stdin) => Ok(Some(thread::spawn(move || stdin.write_all(&bytes)))),
736            None => Err(CommandError::WriteInputFailed {
737                command: command.to_owned(),
738                source: io::Error::other("stdin pipe was not created"),
739            }),
740        },
741        None => Ok(None),
742    }
743}
744
745/// Spawns a reader thread for a child output stream.
746///
747/// # Parameters
748///
749/// * `reader` - Child process output pipe.
750/// * `options` - Capture and tee-file options.
751///
752/// # Returns
753///
754/// Join handle resolving to captured output bytes and truncation metadata.
755fn read_output_stream(
756    mut reader: Box<dyn Read + Send>,
757    options: OutputCaptureOptions,
758) -> OutputReader {
759    thread::spawn(move || read_output(reader.as_mut(), options))
760}
761
762/// Reads one child output stream to completion.
763///
764/// # Parameters
765///
766/// * `reader` - Pipe reader to drain.
767/// * `options` - Capture and tee-file options.
768///
769/// # Returns
770///
771/// Captured bytes and truncation metadata.
772///
773/// # Errors
774///
775/// Returns [`OutputCaptureError`] if reading the pipe or writing the tee file
776/// fails. Tee-file write failures are recorded while the reader continues
777/// draining the child pipe so the child is not blocked by a full output pipe.
778pub(crate) fn read_output(
779    reader: &mut dyn Read,
780    mut options: OutputCaptureOptions,
781) -> Result<CapturedOutput, OutputCaptureError> {
782    let mut bytes = Vec::new();
783    if let Some(max_bytes) = options.max_bytes {
784        bytes.reserve(max_bytes.min(8 * 1024));
785    }
786    let mut truncated = false;
787    let mut write_error = None;
788    let mut buffer = [0_u8; 8 * 1024];
789    loop {
790        let read = reader.read(&mut buffer).map_err(OutputCaptureError::Read)?;
791        if read == 0 {
792            break;
793        }
794        let chunk = &buffer[..read];
795        if let Some(tee) = options.tee.as_mut()
796            && write_error.is_none()
797            && let Err(source) = tee.writer.write_all(chunk)
798        {
799            write_error = Some(OutputCaptureError::Write {
800                path: tee.path.clone(),
801                source,
802            });
803            options.tee = None;
804        }
805        match options.max_bytes {
806            Some(max_bytes) => {
807                let remaining = max_bytes.saturating_sub(bytes.len());
808                if remaining > 0 {
809                    let retained = remaining.min(chunk.len());
810                    bytes.extend_from_slice(&chunk[..retained]);
811                }
812                if chunk.len() > remaining {
813                    truncated = true;
814                }
815            }
816            None => bytes.extend_from_slice(chunk),
817        }
818    }
819    if write_error.is_none()
820        && let Some(tee) = options.tee.as_mut()
821        && let Err(source) = tee.writer.flush()
822    {
823        write_error = Some(OutputCaptureError::Write {
824            path: tee.path.clone(),
825            source,
826        });
827    }
828    if let Some(error) = write_error {
829        Err(error)
830    } else {
831        Ok(CapturedOutput { bytes, truncated })
832    }
833}
834
835/// Builds a process spawn failure.
836///
837/// # Parameters
838///
839/// * `command` - Human-readable command text for diagnostics.
840/// * `source` - I/O error reported by process spawning.
841///
842/// # Returns
843///
844/// A command error preserving the command text and source error.
845pub(crate) fn spawn_failed(command: &str, source: io::Error) -> CommandError {
846    CommandError::SpawnFailed {
847        command: command.to_owned(),
848        source,
849    }
850}
851
852/// Builds a process wait failure.
853///
854/// # Parameters
855///
856/// * `command` - Human-readable command text for diagnostics.
857/// * `source` - I/O error reported while waiting for the process.
858///
859/// # Returns
860///
861/// A command error preserving the command text and source error.
862pub(crate) fn wait_failed(command: &str, source: io::Error) -> CommandError {
863    CommandError::WaitFailed {
864        command: command.to_owned(),
865        source,
866    }
867}
868
869/// Builds a timed-out process kill failure.
870///
871/// # Parameters
872///
873/// * `command` - Human-readable command text for diagnostics.
874/// * `timeout` - Timeout that had been exceeded.
875/// * `source` - I/O error reported while killing the process.
876///
877/// # Returns
878///
879/// A command error preserving timeout and kill-failure context.
880pub(crate) fn kill_failed(command: String, timeout: Duration, source: io::Error) -> CommandError {
881    CommandError::KillFailed {
882        command,
883        timeout,
884        source,
885    }
886}
887
888/// Collects reader-thread results into a command output value.
889///
890/// # Parameters
891///
892/// * `command` - Human-readable command text for diagnostics.
893/// * `status` - Process exit status.
894/// * `elapsed` - Observed command duration.
895/// * `lossy_output` - Whether output text accessors should replace invalid
896///   UTF-8 bytes.
897/// * `stdout_reader` - Reader thread for stdout.
898/// * `stderr_reader` - Reader thread for stderr.
899/// * `stdin_writer` - Optional writer thread for configured stdin bytes.
900///
901/// # Returns
902///
903/// Command output containing both captured streams.
904///
905/// # Errors
906///
907/// Returns [`CommandError`] if stream collection or stdin writing fails. All
908/// helper threads are joined before an error is returned.
909pub(crate) fn collect_output(
910    command: &str,
911    status: ExitStatus,
912    elapsed: Duration,
913    lossy_output: bool,
914    stdout_reader: OutputReader,
915    stderr_reader: OutputReader,
916    stdin_writer: StdinWriter,
917) -> Result<CommandOutput, CommandError> {
918    #[cfg(coverage)]
919    crate::coverage_support::record_collect_output(command);
920
921    #[cfg(coverage)]
922    let forced_error =
923        crate::coverage_support::forced_collect_output_error(command).map(|stream| {
924            CommandError::ReadOutputFailed {
925                command: command.to_owned(),
926                stream,
927                source: io::Error::other("forced output collection failure"),
928            }
929        });
930    #[cfg(not(coverage))]
931    let forced_error = None;
932
933    let stdout_result = join_output_reader(command, OutputStream::Stdout, stdout_reader);
934    let stderr_result = join_output_reader(command, OutputStream::Stderr, stderr_reader);
935    let stdin_result = join_stdin_writer(command, stdin_writer);
936
937    match (stdout_result, stderr_result, stdin_result, forced_error) {
938        (Ok(stdout), Ok(stderr), Ok(()), None) => Ok(CommandOutput::new(
939            status,
940            stdout.bytes,
941            stderr.bytes,
942            stdout.truncated,
943            stderr.truncated,
944            elapsed,
945            lossy_output,
946        )),
947        (Err(error), _, _, _)
948        | (_, Err(error), _, _)
949        | (_, _, Err(error), _)
950        | (_, _, _, Some(error)) => Err(error),
951    }
952}
953
954/// Joins one output reader and maps failures to command errors.
955///
956/// # Parameters
957///
958/// * `command` - Human-readable command text for diagnostics.
959/// * `stream` - Stream associated with the reader.
960/// * `reader` - Join handle to collect.
961///
962/// # Returns
963///
964/// Captured bytes and truncation metadata for the requested stream.
965///
966/// # Errors
967///
968/// Returns [`CommandError`] when the reader reports I/O failure, tee-file write
969/// failure, or panics.
970pub(crate) fn join_output_reader(
971    command: &str,
972    stream: OutputStream,
973    reader: OutputReader,
974) -> Result<CapturedOutput, CommandError> {
975    match reader.join() {
976        Ok(Ok(output)) => Ok(output),
977        Ok(Err(OutputCaptureError::Read(source))) => Err(CommandError::ReadOutputFailed {
978            command: command.to_owned(),
979            stream,
980            source,
981        }),
982        Ok(Err(OutputCaptureError::Write { path, source })) => {
983            Err(CommandError::WriteOutputFailed {
984                command: command.to_owned(),
985                stream,
986                path,
987                source,
988            })
989        }
990        Err(_) => Err(CommandError::ReadOutputFailed {
991            command: command.to_owned(),
992            stream,
993            source: io::Error::other("output reader thread panicked"),
994        }),
995    }
996}
997
998/// Joins the stdin writer and maps failures to command errors.
999///
1000/// # Parameters
1001///
1002/// * `command` - Human-readable command text for diagnostics.
1003/// * `writer` - Optional stdin writer thread.
1004///
1005/// # Errors
1006///
1007/// Returns [`CommandError::WriteInputFailed`] when writing stdin fails or the
1008/// writer thread panics. A broken pipe is ignored because it only means the
1009/// child closed stdin before consuming all configured bytes; the process exit
1010/// status remains the authoritative command result.
1011pub(crate) fn join_stdin_writer(command: &str, writer: StdinWriter) -> Result<(), CommandError> {
1012    match writer {
1013        Some(writer) => match writer.join() {
1014            Ok(Ok(())) => Ok(()),
1015            Ok(Err(source)) if source.kind() == io::ErrorKind::BrokenPipe => Ok(()),
1016            Ok(Err(source)) => Err(CommandError::WriteInputFailed {
1017                command: command.to_owned(),
1018                source,
1019            }),
1020            Err(_) => Err(CommandError::WriteInputFailed {
1021                command: command.to_owned(),
1022                source: io::Error::other("stdin writer thread panicked"),
1023            }),
1024        },
1025        None => Ok(()),
1026    }
1027}
1028
1029/// Builds an internal missing-pipe error.
1030///
1031/// # Parameters
1032///
1033/// * `command` - Human-readable command text for diagnostics.
1034/// * `stream` - Missing output stream.
1035///
1036/// # Returns
1037///
1038/// A command error describing the missing pipe.
1039pub(crate) fn output_pipe_error(command: &str, stream: OutputStream) -> CommandError {
1040    CommandError::ReadOutputFailed {
1041        command: command.to_owned(),
1042        stream,
1043        source: io::Error::other(format!("{} pipe was not created", stream.as_str())),
1044    }
1045}
1046
1047/// Calculates how long to sleep before polling the child again.
1048///
1049/// # Parameters
1050///
1051/// * `timeout` - Optional command timeout.
1052/// * `elapsed` - Elapsed command duration.
1053///
1054/// # Returns
1055///
1056/// A short polling delay that does not intentionally sleep past the timeout.
1057pub(crate) fn next_sleep(timeout: Option<Duration>, elapsed: Duration) -> Duration {
1058    if let Some(timeout) = timeout
1059        && let Some(remaining) = timeout.checked_sub(elapsed)
1060    {
1061        return remaining.min(WAIT_POLL_INTERVAL);
1062    }
1063    WAIT_POLL_INTERVAL
1064}