use std::io::{self, Read};
use std::process::{ChildStdin, ChildStdout, ExitStatus};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use shared_child::SharedChild;
use crate::cmd::RunOutput;
use crate::cmd_display::CmdDisplay;
use crate::error::{RunError, truncate_suffix, truncate_suffix_string};
pub struct SpawnedProcess {
child: Arc<SharedChild>,
stdout: Mutex<StdoutState>,
stderr_thread: Mutex<Option<thread::JoinHandle<Vec<u8>>>>,
command: CmdDisplay,
}
enum StdoutState {
NotTaken,
Cached(ChildStdout),
GivenAway,
}
impl SpawnedProcess {
pub(crate) fn new(
child: Arc<SharedChild>,
stderr_thread: Option<thread::JoinHandle<Vec<u8>>>,
command: CmdDisplay,
) -> Self {
Self {
child,
stdout: Mutex::new(StdoutState::NotTaken),
stderr_thread: Mutex::new(stderr_thread),
command,
}
}
pub fn command(&self) -> &CmdDisplay {
&self.command
}
pub fn take_stdin(&self) -> Option<ChildStdin> {
self.child.take_stdin()
}
pub fn take_stdout(&self) -> Option<ChildStdout> {
let mut guard = self.stdout.lock().ok()?;
if matches!(*guard, StdoutState::NotTaken) {
*guard = StdoutState::GivenAway;
self.child.take_stdout()
} else {
None
}
}
pub fn pids(&self) -> Vec<u32> {
vec![self.child.id()]
}
pub fn kill(&self) -> io::Result<()> {
self.child.kill()
}
pub fn try_wait(&self) -> Result<Option<RunOutput>, RunError> {
match self.child.try_wait() {
Ok(Some(status)) => self.finalize(status).map(Some),
Ok(None) => Ok(None),
Err(source) => Err(RunError::Spawn {
command: self.command.clone(),
source,
}),
}
}
pub fn wait(&self) -> Result<RunOutput, RunError> {
let status = self.child.wait().map_err(|source| RunError::Spawn {
command: self.command.clone(),
source,
})?;
self.finalize(status)
}
pub fn wait_timeout(&self, timeout: Duration) -> Result<Option<RunOutput>, RunError> {
match self.child.wait_timeout(timeout) {
Ok(Some(status)) => self.finalize(status).map(Some),
Ok(None) => Ok(None),
Err(source) => Err(RunError::Spawn {
command: self.command.clone(),
source,
}),
}
}
fn finalize(&self, status: ExitStatus) -> Result<RunOutput, RunError> {
let stderr_bytes = self
.stderr_thread
.lock()
.ok()
.and_then(|mut s| s.take())
.map(|h| h.join().unwrap_or_default())
.unwrap_or_default();
let stderr_str = String::from_utf8_lossy(&stderr_bytes).into_owned();
let stdout_bytes = self.drain_remaining_stdout();
if status.success() {
Ok(RunOutput {
stdout: stdout_bytes,
stderr: stderr_str,
})
} else {
Err(RunError::NonZeroExit {
command: self.command.clone(),
status,
stdout: truncate_suffix(stdout_bytes),
stderr: truncate_suffix_string(stderr_str),
})
}
}
fn drain_remaining_stdout(&self) -> Vec<u8> {
let Ok(mut guard) = self.stdout.lock() else {
return Vec::new();
};
let mut pipe = match std::mem::replace(&mut *guard, StdoutState::GivenAway) {
StdoutState::NotTaken => match self.child.take_stdout() {
Some(p) => p,
None => return Vec::new(),
},
StdoutState::Cached(p) => p,
StdoutState::GivenAway => return Vec::new(),
};
let mut buf = Vec::new();
let _ = pipe.read_to_end(&mut buf);
buf
}
}
impl std::fmt::Debug for SpawnedProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpawnedProcess")
.field("command", &self.command)
.field("pid", &self.child.id())
.finish()
}
}
impl Read for SpawnedProcess {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
read_via_handle(self, buf)
}
}
impl Read for &SpawnedProcess {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
read_via_handle(self, buf)
}
}
fn read_via_handle(p: &SpawnedProcess, buf: &mut [u8]) -> io::Result<usize> {
let mut guard = p
.stdout
.lock()
.map_err(|_| io::Error::other("stdout mutex poisoned"))?;
if matches!(*guard, StdoutState::NotTaken) {
match p.child.take_stdout() {
Some(pipe) => *guard = StdoutState::Cached(pipe),
None => *guard = StdoutState::GivenAway,
}
}
match &mut *guard {
StdoutState::Cached(pipe) => pipe.read(buf),
StdoutState::NotTaken | StdoutState::GivenAway => Ok(0),
}
}