use std::io::{self, Read};
use std::process::{ChildStdin, ChildStdout, ExitStatus};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use shared_child::SharedChild;
use crate::cmd::RunOutput;
use crate::cmd_display::CmdDisplay;
use crate::error::{RunError, truncate_suffix, truncate_suffix_string};
pub struct SpawnedProcess {
children: Vec<Arc<SharedChild>>,
stdout: Mutex<StdoutState>,
stderr_threads: Mutex<Vec<thread::JoinHandle<Vec<u8>>>>,
command: CmdDisplay,
finalized: Mutex<Option<Arc<FinalizedState>>>,
}
struct FinalizedState {
statuses: Vec<ExitStatus>,
stdout: Vec<u8>,
stderr: String,
}
enum StdoutState {
NotTaken,
Cached(ChildStdout),
GivenAway,
}
impl SpawnedProcess {
pub(crate) fn new_single(
child: Arc<SharedChild>,
stderr_thread: Option<thread::JoinHandle<Vec<u8>>>,
command: CmdDisplay,
) -> Self {
Self {
children: vec![child],
stdout: Mutex::new(StdoutState::NotTaken),
stderr_threads: Mutex::new(stderr_thread.into_iter().collect()),
command,
finalized: Mutex::new(None),
}
}
pub(crate) fn new_pipeline(
children: Vec<Arc<SharedChild>>,
stderr_threads: Vec<thread::JoinHandle<Vec<u8>>>,
command: CmdDisplay,
) -> Self {
debug_assert!(!children.is_empty());
Self {
children,
stdout: Mutex::new(StdoutState::NotTaken),
stderr_threads: Mutex::new(stderr_threads),
command,
finalized: Mutex::new(None),
}
}
pub fn command(&self) -> &CmdDisplay {
&self.command
}
pub fn is_pipeline(&self) -> bool {
self.children.len() > 1
}
pub fn take_stdin(&self) -> Option<ChildStdin> {
self.children[0].take_stdin()
}
pub fn take_stdout(&self) -> Option<ChildStdout> {
let mut guard = self.stdout.lock().ok()?;
if matches!(*guard, StdoutState::NotTaken) {
*guard = StdoutState::GivenAway;
self.children.last()?.take_stdout()
} else {
None
}
}
pub fn pids(&self) -> Vec<u32> {
self.children.iter().map(|c| c.id()).collect()
}
pub fn kill(&self) -> io::Result<()> {
let mut first_err: Option<io::Error> = None;
for c in &self.children {
if let Err(e) = c.kill()
&& first_err.is_none()
{
first_err = Some(e);
}
}
match first_err {
None => Ok(()),
Some(e) => Err(e),
}
}
pub fn try_wait(&self) -> Result<Option<RunOutput>, RunError> {
if let Some(state) = self.cached_state() {
return self.reconstruct(&state).map(Some);
}
let mut statuses = Vec::with_capacity(self.children.len());
for c in &self.children {
match c.try_wait() {
Ok(Some(status)) => statuses.push(status),
Ok(None) => return Ok(None),
Err(source) => {
return Err(RunError::Spawn {
command: self.command.clone(),
source,
});
}
}
}
self.finalize_or_cached(statuses).map(Some)
}
pub fn wait(&self) -> Result<RunOutput, RunError> {
if let Some(state) = self.cached_state() {
return self.reconstruct(&state);
}
let mut statuses = Vec::with_capacity(self.children.len());
for c in &self.children {
let status = c.wait().map_err(|source| RunError::Spawn {
command: self.command.clone(),
source,
})?;
statuses.push(status);
}
self.finalize_or_cached(statuses)
}
pub fn wait_timeout(&self, timeout: Duration) -> Result<Option<RunOutput>, RunError> {
if let Some(state) = self.cached_state() {
return self.reconstruct(&state).map(Some);
}
let start = Instant::now();
let mut statuses = Vec::with_capacity(self.children.len());
for c in &self.children {
let remaining = timeout.saturating_sub(start.elapsed());
match c.wait_timeout(remaining) {
Ok(Some(status)) => statuses.push(status),
Ok(None) => return Ok(None),
Err(source) => {
return Err(RunError::Spawn {
command: self.command.clone(),
source,
});
}
}
}
self.finalize_or_cached(statuses).map(Some)
}
fn cached_state(&self) -> Option<Arc<FinalizedState>> {
self.finalized
.lock()
.ok()
.and_then(|g| g.as_ref().map(Arc::clone))
}
fn reconstruct(&self, state: &FinalizedState) -> Result<RunOutput, RunError> {
let chosen = pipefail_status(&state.statuses);
if chosen.success() {
Ok(RunOutput {
stdout: state.stdout.clone(),
stderr: state.stderr.clone(),
})
} else {
Err(RunError::NonZeroExit {
command: self.command.clone(),
status: chosen,
stdout: truncate_suffix(state.stdout.clone()),
stderr: truncate_suffix_string(state.stderr.clone()),
})
}
}
fn finalize_or_cached(
&self,
statuses: Vec<ExitStatus>,
) -> Result<RunOutput, RunError> {
let mut guard = self
.finalized
.lock()
.expect("finalized mutex poisoned");
if let Some(state) = guard.as_ref() {
let state = Arc::clone(state);
drop(guard);
return self.reconstruct(&state);
}
let stderr_bytes = self.join_stderr_threads();
let stderr_str = String::from_utf8_lossy(&stderr_bytes).into_owned();
let stdout_bytes = self.drain_remaining_stdout();
let state = Arc::new(FinalizedState {
statuses,
stdout: stdout_bytes,
stderr: stderr_str,
});
*guard = Some(Arc::clone(&state));
drop(guard);
self.reconstruct(&state)
}
fn join_stderr_threads(&self) -> Vec<u8> {
let Ok(mut guard) = self.stderr_threads.lock() else {
return Vec::new();
};
let mut out = Vec::new();
for t in guard.drain(..) {
if let Ok(bytes) = t.join() {
out.extend(bytes);
}
}
out
}
fn drain_remaining_stdout(&self) -> Vec<u8> {
let Ok(mut guard) = self.stdout.lock() else {
return Vec::new();
};
let mut pipe = match std::mem::replace(&mut *guard, StdoutState::GivenAway) {
StdoutState::NotTaken => match self.children.last().and_then(|c| c.take_stdout()) {
Some(p) => p,
None => return Vec::new(),
},
StdoutState::Cached(p) => p,
StdoutState::GivenAway => return Vec::new(),
};
let mut buf = Vec::new();
let _ = pipe.read_to_end(&mut buf);
buf
}
}
fn pipefail_status(statuses: &[ExitStatus]) -> ExitStatus {
let mut chosen = statuses[0];
for &s in statuses.iter().skip(1) {
if !s.success() || chosen.success() {
chosen = s;
}
}
chosen
}
impl std::fmt::Debug for SpawnedProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpawnedProcess")
.field("command", &self.command)
.field("pids", &self.pids())
.finish()
}
}
impl Read for SpawnedProcess {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
read_via_handle(self, buf)
}
}
impl Read for &SpawnedProcess {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
read_via_handle(self, buf)
}
}
fn read_via_handle(p: &SpawnedProcess, buf: &mut [u8]) -> io::Result<usize> {
let mut guard = p
.stdout
.lock()
.map_err(|_| io::Error::other("stdout mutex poisoned"))?;
if matches!(*guard, StdoutState::NotTaken) {
match p.children.last().and_then(|c| c.take_stdout()) {
Some(pipe) => *guard = StdoutState::Cached(pipe),
None => *guard = StdoutState::GivenAway,
}
}
match &mut *guard {
StdoutState::Cached(pipe) => pipe.read(buf),
StdoutState::NotTaken | StdoutState::GivenAway => Ok(0),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
fn make_status(code: i32) -> ExitStatus {
use std::os::unix::process::ExitStatusExt;
ExitStatus::from_raw(code << 8)
}
#[cfg(windows)]
fn make_status(code: u32) -> ExitStatus {
use std::os::windows::process::ExitStatusExt;
ExitStatus::from_raw(code)
}
#[test]
fn pipefail_rightmost_failure_wins() {
let s = pipefail_status(&[make_status(1), make_status(0), make_status(2)]);
assert_eq!(s.code(), Some(2));
}
#[test]
fn pipefail_only_failure_wins_over_later_success() {
let s = pipefail_status(&[make_status(7), make_status(0), make_status(0)]);
assert_eq!(s.code(), Some(7));
}
#[test]
fn pipefail_all_success() {
let s = pipefail_status(&[make_status(0), make_status(0)]);
assert!(s.success());
}
}