use std::io;
use std::process::ExitStatus;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::process::{Child, ChildStdin, ChildStdout};
use tokio::task::JoinHandle;
use crate::cmd::RunOutput;
use crate::cmd_display::CmdDisplay;
use crate::error::{RunError, truncate_suffix, truncate_suffix_string};
pub struct AsyncSpawnedProcess {
children: Vec<Child>,
stderr_tasks: Vec<JoinHandle<Vec<u8>>>,
command: CmdDisplay,
finalized: Option<Arc<FinalizedState>>,
}
struct FinalizedState {
statuses: Vec<ExitStatus>,
stdout: Vec<u8>,
stderr: String,
}
impl AsyncSpawnedProcess {
pub(crate) fn new_single(
child: Child,
stderr_task: Option<JoinHandle<Vec<u8>>>,
command: CmdDisplay,
) -> Self {
Self {
children: vec![child],
stderr_tasks: stderr_task.into_iter().collect(),
command,
finalized: None,
}
}
pub(crate) fn new_pipeline(
children: Vec<Child>,
stderr_tasks: Vec<JoinHandle<Vec<u8>>>,
command: CmdDisplay,
) -> Self {
debug_assert!(!children.is_empty());
Self {
children,
stderr_tasks,
command,
finalized: None,
}
}
pub fn command(&self) -> &CmdDisplay {
&self.command
}
pub fn is_pipeline(&self) -> bool {
self.children.len() > 1
}
pub fn pids(&self) -> Vec<u32> {
self.children.iter().filter_map(|c| c.id()).collect()
}
pub fn take_stdin(&mut self) -> Option<ChildStdin> {
self.children[0].stdin.take()
}
pub fn take_stdout(&mut self) -> Option<ChildStdout> {
self.children.last_mut()?.stdout.take()
}
pub async fn kill(&mut self) -> io::Result<()> {
let mut first_err: Option<io::Error> = None;
for c in &mut self.children {
if let Err(e) = c.kill().await
&& first_err.is_none()
{
first_err = Some(e);
}
}
match first_err {
None => Ok(()),
Some(e) => Err(e),
}
}
pub async fn try_wait(&mut self) -> Result<Option<RunOutput>, RunError> {
if let Some(state) = self.finalized.clone() {
return self.reconstruct(&state).map(Some);
}
let mut statuses = Vec::with_capacity(self.children.len());
for c in &mut self.children {
match c.try_wait() {
Ok(Some(status)) => statuses.push(status),
Ok(None) => return Ok(None),
Err(source) => {
return Err(RunError::Spawn {
command: self.command.clone(),
source,
});
}
}
}
self.finalize(statuses).await.map(Some)
}
pub async fn wait(&mut self) -> Result<RunOutput, RunError> {
if let Some(state) = self.finalized.clone() {
return self.reconstruct(&state);
}
let mut statuses = Vec::with_capacity(self.children.len());
for c in &mut self.children {
let status = c.wait().await.map_err(|source| RunError::Spawn {
command: self.command.clone(),
source,
})?;
statuses.push(status);
}
self.finalize(statuses).await
}
pub async fn wait_timeout(
&mut self,
timeout: Duration,
) -> Result<Option<RunOutput>, RunError> {
match tokio::time::timeout(timeout, self.wait()).await {
Ok(res) => res.map(Some),
Err(_) => Ok(None),
}
}
fn reconstruct(&self, state: &FinalizedState) -> Result<RunOutput, RunError> {
let chosen = pipefail_status(&state.statuses);
if chosen.success() {
Ok(RunOutput {
stdout: state.stdout.clone(),
stderr: state.stderr.clone(),
})
} else {
Err(RunError::NonZeroExit {
command: self.command.clone(),
status: chosen,
stdout: truncate_suffix(state.stdout.clone()),
stderr: truncate_suffix_string(state.stderr.clone()),
})
}
}
async fn finalize(&mut self, statuses: Vec<ExitStatus>) -> Result<RunOutput, RunError> {
if let Some(state) = self.finalized.clone() {
return self.reconstruct(&state);
}
let stderr_bytes = self.drain_stderr_tasks().await;
let stderr_str = String::from_utf8_lossy(&stderr_bytes).into_owned();
let stdout_bytes = self.drain_stdout().await;
let state = Arc::new(FinalizedState {
statuses,
stdout: stdout_bytes,
stderr: stderr_str,
});
self.finalized = Some(Arc::clone(&state));
self.reconstruct(&state)
}
async fn drain_stderr_tasks(&mut self) -> Vec<u8> {
let mut out = Vec::new();
for t in self.stderr_tasks.drain(..) {
if let Ok(bytes) = t.await {
out.extend(bytes);
}
}
out
}
async fn drain_stdout(&mut self) -> Vec<u8> {
let Some(last) = self.children.last_mut() else {
return Vec::new();
};
let Some(mut pipe) = last.stdout.take() else {
return Vec::new();
};
let mut buf = Vec::new();
let _ = pipe.read_to_end(&mut buf).await;
buf
}
}
impl std::fmt::Debug for AsyncSpawnedProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncSpawnedProcess")
.field("command", &self.command)
.field("pids", &self.pids())
.finish()
}
}
fn pipefail_status(statuses: &[ExitStatus]) -> ExitStatus {
let mut chosen = statuses[0];
for &s in statuses.iter().skip(1) {
if !s.success() || chosen.success() {
chosen = s;
}
}
chosen
}