mod probes;
mod stream;
pub use stream::StdoutLines;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use encoding_rs::Encoding;
use tokio::io::AsyncReadExt;
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
use tokio::task::JoinHandle;
use crate::buffer::OutputBufferPolicy;
#[cfg(feature = "cancellation")]
use crate::error::Error;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::pump::{LineHandler, SharedLines, pump_lines};
use crate::result::ProcessResult;
use crate::stdin::ProcessStdin;
const PUMP_TEARDOWN: Duration = Duration::from_secs(5);
struct Finished {
code: Option<i32>,
timed_out: bool,
stdout_lines: Vec<String>,
stderr_lines: Vec<String>,
}
#[derive(Clone, Copy)]
enum CaptureMode {
Lines,
Discard,
}
pub(crate) struct Spawned {
pub program: String,
pub child: Child,
pub own_group: Option<ProcessGroup>,
pub stdout: Option<ChildStdout>,
pub stderr: Option<ChildStderr>,
pub stdin: Option<ChildStdin>,
pub stdin_task: Option<JoinHandle<std::io::Result<()>>>,
pub timeout: Option<Duration>,
pub timeout_grace: Option<Duration>,
pub timeout_signal: i32,
pub pid: Option<u32>,
pub stdout_encoding: &'static Encoding,
pub stderr_encoding: &'static Encoding,
pub stdout_handler: Option<LineHandler>,
pub stderr_handler: Option<LineHandler>,
pub buffer: OutputBufferPolicy,
pub ok_codes: Vec<i32>,
#[cfg(feature = "cancellation")]
pub cancel_token: Option<tokio_util::sync::CancellationToken>,
}
pub struct RunningProcess {
program: String,
backend: Backend,
timeout: Option<Duration>,
timeout_grace: Option<Duration>,
timeout_signal: i32,
pid: Option<u32>,
stdout_encoding: &'static Encoding,
stderr_encoding: &'static Encoding,
stdout_handler: Option<LineHandler>,
stderr_handler: Option<LineHandler>,
buffer: OutputBufferPolicy,
ok_codes: Vec<i32>,
stdout_sink: Option<Arc<SharedLines>>,
stderr_sink: Option<Arc<SharedLines>>,
stderr_pump: Option<JoinHandle<()>>,
deadline_task: Option<JoinHandle<()>>,
#[cfg(feature = "cancellation")]
cancel_token: Option<tokio_util::sync::CancellationToken>,
#[cfg(feature = "cancellation")]
cancel_task: Option<JoinHandle<()>>,
started: Instant,
start_time: SystemTime,
}
type OutputReader = Box<dyn tokio::io::AsyncRead + Send + Unpin>;
enum Backend {
Real(Box<RealProc>),
Scripted(Box<ScriptedProc>),
}
struct RealProc {
child: Child,
own_group: Option<Arc<ProcessGroup>>,
stdout_pipe: Option<ChildStdout>,
stderr_pipe: Option<ChildStderr>,
stdin_pipe: Option<ChildStdin>,
stdin_task: Option<JoinHandle<std::io::Result<()>>>,
}
pub(crate) struct ScriptedProc {
stdout: Option<tokio::io::DuplexStream>,
stderr: Option<tokio::io::DuplexStream>,
feeders: Vec<JoinHandle<()>>,
code: Option<i32>,
timed_out: bool,
exit_at: Option<tokio::time::Instant>,
killed: bool,
}
impl ScriptedProc {
pub(crate) fn new(
stdout_text: String,
stderr_text: String,
code: Option<i32>,
timed_out: bool,
lifetime: Option<Duration>,
line_delay: Option<Duration>,
) -> Self {
let mut feeders = Vec::new();
let mut feed = |text: String| {
let (mut tx, rx) = tokio::io::duplex(64 * 1024);
if text.is_empty() {
return rx;
}
feeders.push(tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
match line_delay {
None => {
let _ = tx.write_all(text.as_bytes()).await;
}
Some(delay) => {
for line in text.split_inclusive('\n') {
tokio::time::sleep(delay).await;
if tx.write_all(line.as_bytes()).await.is_err() {
break;
}
}
}
}
}));
rx
};
let stdout = feed(stdout_text);
let stderr = feed(stderr_text);
Self {
stdout: Some(stdout),
stderr: Some(stderr),
feeders,
code,
timed_out,
exit_at: lifetime.map(|d| tokio::time::Instant::now() + d),
killed: false,
}
}
fn kill(&mut self) {
self.killed = true;
for task in self.feeders.drain(..) {
task.abort();
}
}
}
impl Backend {
fn own_group(&self) -> Option<&Arc<ProcessGroup>> {
match self {
Backend::Real(real) => real.own_group.as_ref(),
Backend::Scripted(_) => None,
}
}
fn take_stdout_reader(&mut self) -> Option<OutputReader> {
match self {
Backend::Real(real) => real.stdout_pipe.take().map(|p| Box::new(p) as OutputReader),
Backend::Scripted(s) => s.stdout.take().map(|p| Box::new(p) as OutputReader),
}
}
fn take_stderr_reader(&mut self) -> Option<OutputReader> {
match self {
Backend::Real(real) => real.stderr_pipe.take().map(|p| Box::new(p) as OutputReader),
Backend::Scripted(s) => s.stderr.take().map(|p| Box::new(p) as OutputReader),
}
}
}
impl RunningProcess {
pub(crate) fn from_spawned(s: Spawned) -> Self {
Self {
program: s.program,
backend: Backend::Real(Box::new(RealProc {
child: s.child,
own_group: s.own_group.map(Arc::new),
stdout_pipe: s.stdout,
stderr_pipe: s.stderr,
stdin_pipe: s.stdin,
stdin_task: s.stdin_task,
})),
timeout: s.timeout,
timeout_grace: s.timeout_grace,
timeout_signal: s.timeout_signal,
pid: s.pid,
stdout_encoding: s.stdout_encoding,
stderr_encoding: s.stderr_encoding,
stdout_handler: s.stdout_handler,
stderr_handler: s.stderr_handler,
buffer: s.buffer,
ok_codes: s.ok_codes,
stdout_sink: None,
stderr_sink: None,
stderr_pump: None,
deadline_task: None,
#[cfg(feature = "cancellation")]
cancel_token: s.cancel_token,
#[cfg(feature = "cancellation")]
cancel_task: None,
started: Instant::now(),
start_time: SystemTime::now(),
}
}
pub(crate) fn from_scripted(command: &crate::command::Command, scripted: ScriptedProc) -> Self {
Self {
program: command.program_name(),
backend: Backend::Scripted(Box::new(scripted)),
timeout: command.configured_timeout(),
timeout_grace: command.configured_timeout_grace(),
timeout_signal: command.timeout_signal_raw(),
pid: None,
stdout_encoding: command.out_encoding(),
stderr_encoding: command.err_encoding(),
stdout_handler: command.stdout_handler(),
stderr_handler: command.stderr_handler(),
buffer: command.output_buffer_policy(),
ok_codes: command.ok_codes_vec(),
stdout_sink: None,
stderr_sink: None,
stderr_pump: None,
deadline_task: None,
#[cfg(feature = "cancellation")]
cancel_token: command.cancel_token(),
#[cfg(feature = "cancellation")]
cancel_task: None,
started: Instant::now(),
start_time: SystemTime::now(),
}
}
pub(crate) fn attach_group(&mut self, group: ProcessGroup) {
if let Backend::Real(real) = &mut self.backend {
real.own_group = Some(Arc::new(group));
}
}
pub(crate) fn take_stdout_pipe(&mut self) -> Option<ChildStdout> {
match &mut self.backend {
Backend::Real(real) => real.stdout_pipe.take(),
Backend::Scripted(_) => None,
}
}
pub(crate) fn program_name(&self) -> &str {
&self.program
}
}
impl std::fmt::Debug for RunningProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RunningProcess")
.field("program", &self.program)
.field("pid", &self.pid)
.field("timeout", &self.timeout)
.finish_non_exhaustive()
}
}
impl RunningProcess {
pub fn pid(&self) -> Option<u32> {
self.pid
}
pub fn start_time(&self) -> SystemTime {
self.start_time
}
pub fn elapsed(&self) -> Duration {
self.started.elapsed()
}
#[cfg(feature = "stats")]
pub fn cpu_time(&self) -> Option<Duration> {
self.pid
.and_then(|pid| crate::sys::process_metrics(pid).cpu_time)
}
#[cfg(feature = "stats")]
pub fn peak_memory_bytes(&self) -> Option<u64> {
self.pid
.and_then(|pid| crate::sys::process_metrics(pid).peak_memory_bytes)
}
pub fn stdout_line_count(&self) -> usize {
self.stdout_sink.as_ref().map_or(0, |s| s.count())
}
pub fn stderr_line_count(&self) -> usize {
self.stderr_sink.as_ref().map_or(0, |s| s.count())
}
pub fn standard_input(&mut self) -> Option<ProcessStdin> {
match &mut self.backend {
Backend::Real(real) => real.stdin_pipe.take().map(ProcessStdin::new),
Backend::Scripted(_) => None,
}
}
pub async fn output_string(mut self) -> Result<ProcessResult<String>> {
let finished = self
.finish_lines(CaptureMode::Lines, true, || {})
.await?;
let truncated = self
.stdout_sink
.as_ref()
.is_some_and(|s| s.count() > finished.stdout_lines.len())
|| self
.stderr_sink
.as_ref()
.is_some_and(|s| s.count() > finished.stderr_lines.len());
let duration = self.started.elapsed();
Ok(ProcessResult::new(
self.program.clone(),
finished.stdout_lines.join("\n"),
finished.stderr_lines.join("\n"),
finished.code,
finished.timed_out,
self.timeout,
)
.with_duration(duration)
.with_truncated(truncated)
.with_ok_codes(self.ok_codes.clone()))
}
pub async fn output_bytes(mut self) -> Result<ProcessResult<Vec<u8>>> {
let stderr_sink = SharedLines::new(&self.buffer);
let err_pump = self.backend.take_stderr_reader().map(|pipe| {
tokio::spawn(pump_lines(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
stderr_sink.clone(),
))
});
self.stderr_sink = Some(stderr_sink.clone());
let mut stdout_pipe = self.backend.take_stdout_reader();
let out_buf = Arc::new(std::sync::Mutex::new(Vec::new()));
let out_task = {
let out_buf = out_buf.clone();
tokio::spawn(async move {
if let Some(pipe) = &mut stdout_pipe {
let mut chunk = [0u8; 8 * 1024];
loop {
match pipe.read(&mut chunk).await {
Ok(0) | Err(_) => break,
Ok(n) => out_buf
.lock()
.expect("stdout buffer poisoned")
.extend_from_slice(&chunk[..n]),
}
}
}
})
};
let (code, timed_out) = self.drive_to_exit().await?;
self.observe_stdin_task().await;
let abort = out_task.abort_handle();
if tokio::time::timeout(PUMP_TEARDOWN, out_task).await.is_err() {
abort.abort();
}
let stdout = std::mem::take(&mut *out_buf.lock().expect("stdout buffer poisoned"));
join_pumps(err_pump.into_iter().collect()).await;
let (code, timed_out) = self.checked_outcome((code, timed_out))?;
let stderr_lines = stderr_sink.drain();
let truncated = stderr_sink.count() > stderr_lines.len();
let duration = self.started.elapsed();
Ok(ProcessResult::new(
self.program.clone(),
stdout,
stderr_lines.join("\n"),
code,
timed_out,
self.timeout,
)
.with_duration(duration)
.with_truncated(truncated)
.with_ok_codes(self.ok_codes.clone()))
}
pub async fn wait(mut self) -> Result<Option<i32>> {
Ok(self
.finish_lines(CaptureMode::Discard, false, || {})
.await?
.code)
}
pub(crate) async fn wait_exit(&mut self) -> Result<Option<i32>> {
Ok(self.backend_wait().await?.0)
}
#[cfg(feature = "stats")]
pub async fn profile(mut self, every: Duration) -> Result<crate::stats::RunProfile> {
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct Acc {
cpu_time: Option<Duration>,
peak_memory_bytes: Option<u64>,
samples: usize,
}
let every = every.max(Duration::from_millis(1));
let started = self.started;
let acc = Arc::new(Mutex::new(Acc::default()));
let sampler = self.pid.map(|pid| {
let acc = Arc::clone(&acc);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(every);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
let metrics = crate::sys::process_metrics(pid);
if let Ok(mut acc) = acc.lock() {
acc.samples += 1;
if let Some(cpu) = metrics.cpu_time {
acc.cpu_time = Some(cpu);
}
if let Some(peak) = metrics.peak_memory_bytes {
acc.peak_memory_bytes =
Some(acc.peak_memory_bytes.map_or(peak, |prev| prev.max(peak)));
}
}
}
})
});
let exit_code = self
.finish_lines(CaptureMode::Discard, false, || {
if let Some(task) = &sampler {
task.abort();
}
})
.await?
.code;
let duration = started.elapsed();
let (cpu_time, peak_memory_bytes, samples) = match acc.lock() {
Ok(acc) => (acc.cpu_time, acc.peak_memory_bytes, acc.samples),
Err(_) => (None, None, 0),
};
Ok(crate::stats::RunProfile {
exit_code,
duration,
cpu_time,
peak_memory_bytes,
samples,
})
}
async fn finish_lines(
&mut self,
capture: CaptureMode,
expose_counts: bool,
on_exit: impl FnOnce(),
) -> Result<Finished> {
let stdout_sink = SharedLines::new(&self.buffer);
let stderr_sink = SharedLines::new(&self.buffer);
let pumps = self.spawn_line_pumps(&stdout_sink, &stderr_sink);
if expose_counts {
self.stdout_sink = Some(stdout_sink.clone());
self.stderr_sink = Some(stderr_sink.clone());
}
let outcome = self.drive_to_exit().await;
on_exit();
let outcome = outcome?;
self.observe_stdin_task().await;
join_pumps(pumps).await;
let (code, timed_out) = self.checked_outcome(outcome)?;
let (stdout_lines, stderr_lines) = match capture {
CaptureMode::Lines => (stdout_sink.drain(), stderr_sink.drain()),
CaptureMode::Discard => (Vec::new(), Vec::new()),
};
Ok(Finished {
code,
timed_out,
stdout_lines,
stderr_lines,
})
}
fn spawn_line_pumps(
&mut self,
stdout_sink: &Arc<SharedLines>,
stderr_sink: &Arc<SharedLines>,
) -> Vec<JoinHandle<()>> {
let mut tasks = Vec::new();
if let Some(pipe) = self.backend.take_stdout_reader() {
tasks.push(tokio::spawn(pump_lines(
pipe,
self.stdout_encoding,
self.stdout_handler.clone(),
stdout_sink.clone(),
)));
}
if let Some(pipe) = self.backend.take_stderr_reader() {
tasks.push(tokio::spawn(pump_lines(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
stderr_sink.clone(),
)));
}
tasks
}
fn checked_outcome(&self, outcome: (Option<i32>, bool)) -> Result<(Option<i32>, bool)> {
#[cfg(feature = "cancellation")]
if let Some(err) = self.cancelled_error() {
return Err(err);
}
Ok(outcome)
}
async fn observe_stdin_task(&mut self) {
let Backend::Real(real) = &mut self.backend else {
return;
};
let Some(task) = real.stdin_task.take() else {
return;
};
if !task.is_finished() {
real.stdin_task = Some(task);
return;
}
match task.await {
Ok(Err(e)) if !is_broken_pipe(&e) => {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "processkit",
program = %self.program,
error = %e,
"stdin writer failed"
);
#[cfg(not(feature = "tracing"))]
let _ = e;
}
_ => {}
}
}
fn abort_watchdogs(&mut self) {
if let Some(task) = self.deadline_task.take() {
task.abort();
}
#[cfg(feature = "cancellation")]
if let Some(task) = self.cancel_task.take() {
task.abort();
}
}
async fn drive_to_exit(&mut self) -> Result<(Option<i32>, bool)> {
if let Backend::Real(real) = &mut self.backend {
drop(real.stdin_pipe.take());
}
let outcome = self.drive_to_exit_inner().await?;
self.abort_watchdogs();
#[cfg(feature = "tracing")]
{
let (code, timed_out) = outcome;
tracing::debug!(
target: "processkit",
program = %self.program,
code = ?code,
timed_out,
elapsed_ms = self.started.elapsed().as_millis() as u64,
"process exited"
);
}
Ok(outcome)
}
async fn backend_wait(&mut self) -> Result<(Option<i32>, bool)> {
match &mut self.backend {
Backend::Real(real) => Ok((real.child.wait().await?.code(), false)),
Backend::Scripted(s) => {
if s.killed {
return Ok((None, false));
}
match s.exit_at {
Some(at) => {
tokio::time::sleep_until(at).await;
Ok((s.code, s.timed_out))
}
None => std::future::pending().await,
}
}
}
}
#[cfg(not(feature = "cancellation"))]
async fn drive_to_exit_inner(&mut self) -> Result<(Option<i32>, bool)> {
match self.timeout {
Some(limit) => {
let waited = {
let wait = self.backend_wait();
tokio::pin!(wait);
tokio::time::timeout(limit, &mut wait).await
};
match waited {
Ok(outcome) => outcome,
Err(_elapsed) => {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "processkit",
program = %self.program,
timeout_ms = limit.as_millis() as u64,
"timeout elapsed; killing the tree"
);
self.teardown_on_timeout().await;
Ok((None, true))
}
}
}
None => self.backend_wait().await,
}
}
#[cfg(feature = "cancellation")]
async fn drive_to_exit_inner(&mut self) -> Result<(Option<i32>, bool)> {
let limit = self.timeout;
let token = self.cancel_token.clone();
let cancelled = async {
match &token {
Some(token) => token.cancelled().await,
None => std::future::pending::<()>().await,
}
};
let deadline = async {
match limit {
Some(limit) => tokio::time::sleep(limit).await,
None => std::future::pending::<()>().await,
}
};
tokio::select! {
outcome = self.backend_wait() => outcome,
() = cancelled => {
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
program = %self.program,
"cancellation fired; killing the tree"
);
self.kill_tree().await;
Ok((None, false))
}
() = deadline => {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "processkit",
program = %self.program,
timeout_ms = limit.map(|l| l.as_millis() as u64).unwrap_or(0),
"timeout elapsed; killing the tree"
);
self.teardown_on_timeout().await;
Ok((None, true))
}
}
}
async fn kill_tree(&mut self) {
match &mut self.backend {
Backend::Real(real) => {
let _ = real.child.start_kill();
if let Some(group) = &real.own_group {
let _ = group.terminate_all();
}
let _ = real.child.wait().await;
}
Backend::Scripted(s) => s.kill(),
}
}
async fn teardown_on_timeout(&mut self) {
let Some(grace) = self.timeout_grace else {
self.kill_tree().await;
return;
};
let signal = self.timeout_signal;
match &mut self.backend {
Backend::Real(real) => {
let pid = real.child.id();
let own = real.own_group.clone();
let teardown = async {
match &own {
Some(group) => {
let _ = group.graceful_terminate(grace, signal).await;
}
None => {
crate::running::stream::graceful_kill_pid(pid, grace, signal).await;
}
}
};
let _ = tokio::join!(teardown, real.child.wait());
}
Backend::Scripted(s) => s.kill(),
}
}
fn has_exited_now(&mut self) -> bool {
match &mut self.backend {
Backend::Real(real) => matches!(real.child.try_wait(), Ok(Some(_))),
Backend::Scripted(s) => {
s.killed
|| s.exit_at
.is_some_and(|at| tokio::time::Instant::now() >= at)
}
}
}
#[cfg(feature = "cancellation")]
fn cancelled_error(&self) -> Option<Error> {
match &self.cancel_token {
Some(token) if token.is_cancelled() => Some(Error::Cancelled {
program: self.program.clone(),
}),
_ => None,
}
}
pub fn start_kill(&mut self) -> Result<()> {
match &mut self.backend {
Backend::Real(real) => {
real.child.start_kill()?;
}
Backend::Scripted(s) => s.kill(),
}
Ok(())
}
}
impl Drop for RunningProcess {
fn drop(&mut self) {
match &mut self.backend {
Backend::Real(real) => {
if let Some(task) = real.stdin_task.take() {
task.abort();
}
}
Backend::Scripted(s) => s.kill(),
}
if let Some(task) = self.deadline_task.take() {
task.abort();
}
#[cfg(feature = "cancellation")]
if let Some(task) = self.cancel_task.take() {
task.abort();
}
}
}
fn is_broken_pipe(e: &std::io::Error) -> bool {
e.kind() == std::io::ErrorKind::BrokenPipe || matches!(e.raw_os_error(), Some(109 | 232))
}
async fn join_pumps(tasks: Vec<JoinHandle<()>>) {
if tasks.is_empty() {
return;
}
let aborts: Vec<_> = tasks.iter().map(|t| t.abort_handle()).collect();
let join = async {
for task in tasks {
#[cfg(feature = "tracing")]
if let Err(e) = task.await {
tracing::warn!(target: "processkit", error = %e, "output pump task ended abnormally");
}
#[cfg(not(feature = "tracing"))]
let _ = task.await;
}
};
if tokio::time::timeout(PUMP_TEARDOWN, join).await.is_err() {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "processkit",
timeout_ms = PUMP_TEARDOWN.as_millis() as u64,
aborted = aborts.len(),
"output pumps overran teardown grace; aborting stragglers"
);
for abort in aborts {
abort.abort();
}
}
}