mod probes;
mod stream;
pub use stream::StdoutLines;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use encoding_rs::Encoding;
use tokio::io::AsyncReadExt;
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
use tokio::task::JoinHandle;
use crate::buffer::OutputBufferPolicy;
#[cfg(feature = "cancellation")]
use crate::error::Error;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::pump::{LineHandler, SharedLines, pump_lines};
use crate::result::ProcessResult;
use crate::stdin::ProcessStdin;
const PUMP_TEARDOWN: Duration = Duration::from_secs(5);
pub(crate) struct Spawned {
pub program: String,
pub child: Child,
pub own_group: Option<ProcessGroup>,
pub stdout: Option<ChildStdout>,
pub stderr: Option<ChildStderr>,
pub stdin: Option<ChildStdin>,
pub stdin_task: Option<JoinHandle<std::io::Result<()>>>,
pub timeout: Option<Duration>,
pub pid: Option<u32>,
pub stdout_encoding: &'static Encoding,
pub stderr_encoding: &'static Encoding,
pub stdout_handler: Option<LineHandler>,
pub stderr_handler: Option<LineHandler>,
pub buffer: OutputBufferPolicy,
#[cfg(feature = "cancellation")]
pub cancel_token: Option<tokio_util::sync::CancellationToken>,
}
pub struct RunningProcess {
program: String,
child: Child,
own_group: Option<Arc<ProcessGroup>>,
stdout_pipe: Option<ChildStdout>,
stderr_pipe: Option<ChildStderr>,
stdin_pipe: Option<ChildStdin>,
stdin_task: Option<JoinHandle<std::io::Result<()>>>,
timeout: Option<Duration>,
pid: Option<u32>,
stdout_encoding: &'static Encoding,
stderr_encoding: &'static Encoding,
stdout_handler: Option<LineHandler>,
stderr_handler: Option<LineHandler>,
buffer: OutputBufferPolicy,
stdout_sink: Option<Arc<SharedLines>>,
stderr_sink: Option<Arc<SharedLines>>,
stderr_pump: Option<JoinHandle<()>>,
deadline_task: Option<JoinHandle<()>>,
#[cfg(feature = "cancellation")]
cancel_token: Option<tokio_util::sync::CancellationToken>,
#[cfg(feature = "cancellation")]
cancel_task: Option<JoinHandle<()>>,
started: Instant,
start_time: SystemTime,
}
impl RunningProcess {
pub(crate) fn from_spawned(s: Spawned) -> Self {
Self {
program: s.program,
child: s.child,
own_group: s.own_group.map(Arc::new),
stdout_pipe: s.stdout,
stderr_pipe: s.stderr,
stdin_pipe: s.stdin,
stdin_task: s.stdin_task,
timeout: s.timeout,
pid: s.pid,
stdout_encoding: s.stdout_encoding,
stderr_encoding: s.stderr_encoding,
stdout_handler: s.stdout_handler,
stderr_handler: s.stderr_handler,
buffer: s.buffer,
stdout_sink: None,
stderr_sink: None,
stderr_pump: None,
deadline_task: None,
#[cfg(feature = "cancellation")]
cancel_token: s.cancel_token,
#[cfg(feature = "cancellation")]
cancel_task: None,
started: Instant::now(),
start_time: SystemTime::now(),
}
}
pub(crate) fn attach_group(&mut self, group: ProcessGroup) {
self.own_group = Some(Arc::new(group));
}
pub(crate) fn take_stdout_pipe(&mut self) -> Option<ChildStdout> {
self.stdout_pipe.take()
}
pub(crate) fn program_name(&self) -> &str {
&self.program
}
}
impl std::fmt::Debug for RunningProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RunningProcess")
.field("program", &self.program)
.field("pid", &self.pid)
.field("timeout", &self.timeout)
.finish_non_exhaustive()
}
}
impl RunningProcess {
pub fn pid(&self) -> Option<u32> {
self.pid
}
pub fn start_time(&self) -> SystemTime {
self.start_time
}
pub fn elapsed(&self) -> Duration {
self.started.elapsed()
}
#[cfg(feature = "stats")]
pub fn cpu_time(&self) -> Option<Duration> {
self.pid
.and_then(|pid| crate::sys::process_metrics(pid).cpu_time)
}
#[cfg(feature = "stats")]
pub fn peak_memory_bytes(&self) -> Option<u64> {
self.pid
.and_then(|pid| crate::sys::process_metrics(pid).peak_memory_bytes)
}
pub fn stdout_line_count(&self) -> usize {
self.stdout_sink.as_ref().map_or(0, |s| s.count())
}
pub fn stderr_line_count(&self) -> usize {
self.stderr_sink.as_ref().map_or(0, |s| s.count())
}
pub fn standard_input(&mut self) -> Option<ProcessStdin> {
self.stdin_pipe.take().map(ProcessStdin::new)
}
pub async fn output_string(mut self) -> Result<ProcessResult<String>> {
let stdout_sink = SharedLines::new(&self.buffer);
let stderr_sink = SharedLines::new(&self.buffer);
let pumps = self.spawn_line_pumps(&stdout_sink, &stderr_sink);
self.stdout_sink = Some(stdout_sink.clone());
self.stderr_sink = Some(stderr_sink.clone());
let outcome = self.drive_to_exit().await?;
join_pumps(pumps).await;
let (code, timed_out) = self.checked_outcome(outcome)?;
Ok(ProcessResult::new(
self.program.clone(),
stdout_sink.drain().join("\n"),
stderr_sink.drain().join("\n"),
code,
timed_out,
self.timeout,
))
}
pub async fn output_bytes(mut self) -> Result<ProcessResult<Vec<u8>>> {
let stderr_sink = SharedLines::new(&self.buffer);
let err_pump = self.stderr_pipe.take().map(|pipe| {
tokio::spawn(pump_lines(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
stderr_sink.clone(),
))
});
self.stderr_sink = Some(stderr_sink.clone());
let mut stdout_pipe = self.stdout_pipe.take();
let out_buf = Arc::new(std::sync::Mutex::new(Vec::new()));
let out_task = {
let out_buf = out_buf.clone();
tokio::spawn(async move {
if let Some(pipe) = &mut stdout_pipe {
let mut chunk = [0u8; 8 * 1024];
loop {
match pipe.read(&mut chunk).await {
Ok(0) | Err(_) => break,
Ok(n) => out_buf
.lock()
.expect("stdout buffer poisoned")
.extend_from_slice(&chunk[..n]),
}
}
}
})
};
let (code, timed_out) = self.drive_to_exit().await?;
let abort = out_task.abort_handle();
if tokio::time::timeout(PUMP_TEARDOWN, out_task).await.is_err() {
abort.abort();
}
let stdout = std::mem::take(&mut *out_buf.lock().expect("stdout buffer poisoned"));
join_pumps(err_pump.into_iter().collect()).await;
let (code, timed_out) = self.checked_outcome((code, timed_out))?;
Ok(ProcessResult::new(
self.program.clone(),
stdout,
stderr_sink.drain().join("\n"),
code,
timed_out,
self.timeout,
))
}
pub async fn wait(mut self) -> Result<Option<i32>> {
let stdout_sink = SharedLines::new(&self.buffer);
let stderr_sink = SharedLines::new(&self.buffer);
let pumps = self.spawn_line_pumps(&stdout_sink, &stderr_sink);
let outcome = self.drive_to_exit().await?;
join_pumps(pumps).await;
let (code, _timed_out) = self.checked_outcome(outcome)?;
Ok(code)
}
pub(crate) async fn wait_exit(&mut self) -> Result<Option<i32>> {
Ok(self.child.wait().await?.code())
}
#[cfg(feature = "stats")]
pub async fn profile(mut self, every: Duration) -> Result<crate::stats::RunProfile> {
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct Acc {
cpu_time: Option<Duration>,
peak_memory_bytes: Option<u64>,
samples: usize,
}
let every = every.max(Duration::from_millis(1));
let started = self.started;
let acc = Arc::new(Mutex::new(Acc::default()));
let sampler = self.pid.map(|pid| {
let acc = Arc::clone(&acc);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(every);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
let metrics = crate::sys::process_metrics(pid);
if let Ok(mut acc) = acc.lock() {
acc.samples += 1;
if let Some(cpu) = metrics.cpu_time {
acc.cpu_time = Some(cpu);
}
if let Some(peak) = metrics.peak_memory_bytes {
acc.peak_memory_bytes =
Some(acc.peak_memory_bytes.map_or(peak, |prev| prev.max(peak)));
}
}
}
})
});
let stdout_sink = SharedLines::new(&self.buffer);
let stderr_sink = SharedLines::new(&self.buffer);
let pumps = self.spawn_line_pumps(&stdout_sink, &stderr_sink);
let outcome = self.drive_to_exit().await;
if let Some(task) = &sampler {
task.abort();
}
let outcome = outcome?;
join_pumps(pumps).await;
let (exit_code, _timed_out) = self.checked_outcome(outcome)?;
let duration = started.elapsed();
let (cpu_time, peak_memory_bytes, samples) = match acc.lock() {
Ok(acc) => (acc.cpu_time, acc.peak_memory_bytes, acc.samples),
Err(_) => (None, None, 0),
};
Ok(crate::stats::RunProfile {
exit_code,
duration,
cpu_time,
peak_memory_bytes,
samples,
})
}
fn spawn_line_pumps(
&mut self,
stdout_sink: &Arc<SharedLines>,
stderr_sink: &Arc<SharedLines>,
) -> Vec<JoinHandle<()>> {
let mut tasks = Vec::new();
if let Some(pipe) = self.stdout_pipe.take() {
tasks.push(tokio::spawn(pump_lines(
pipe,
self.stdout_encoding,
self.stdout_handler.clone(),
stdout_sink.clone(),
)));
}
if let Some(pipe) = self.stderr_pipe.take() {
tasks.push(tokio::spawn(pump_lines(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
stderr_sink.clone(),
)));
}
tasks
}
fn checked_outcome(&self, outcome: (Option<i32>, bool)) -> Result<(Option<i32>, bool)> {
#[cfg(feature = "cancellation")]
if let Some(err) = self.cancelled_error() {
return Err(err);
}
Ok(outcome)
}
fn abort_watchdogs(&mut self) {
if let Some(task) = self.deadline_task.take() {
task.abort();
}
#[cfg(feature = "cancellation")]
if let Some(task) = self.cancel_task.take() {
task.abort();
}
}
async fn drive_to_exit(&mut self) -> Result<(Option<i32>, bool)> {
let outcome = self.drive_to_exit_inner().await?;
self.abort_watchdogs();
#[cfg(feature = "tracing")]
{
let (code, timed_out) = outcome;
tracing::debug!(
target: "processkit",
program = %self.program,
code = ?code,
timed_out,
elapsed_ms = self.started.elapsed().as_millis() as u64,
"process exited"
);
}
Ok(outcome)
}
#[cfg(not(feature = "cancellation"))]
async fn drive_to_exit_inner(&mut self) -> Result<(Option<i32>, bool)> {
match self.timeout {
Some(limit) => match tokio::time::timeout(limit, self.child.wait()).await {
Ok(status) => Ok((status?.code(), false)),
Err(_elapsed) => {
self.kill_tree().await;
Ok((None, true))
}
},
None => Ok((self.child.wait().await?.code(), false)),
}
}
#[cfg(feature = "cancellation")]
async fn drive_to_exit_inner(&mut self) -> Result<(Option<i32>, bool)> {
let limit = self.timeout;
let token = self.cancel_token.clone();
let cancelled = async {
match &token {
Some(token) => token.cancelled().await,
None => std::future::pending::<()>().await,
}
};
let deadline = async {
match limit {
Some(limit) => tokio::time::sleep(limit).await,
None => std::future::pending::<()>().await,
}
};
tokio::select! {
status = self.child.wait() => Ok((status?.code(), false)),
() = cancelled => {
self.kill_tree().await;
Ok((None, false))
}
() = deadline => {
self.kill_tree().await;
Ok((None, true))
}
}
}
async fn kill_tree(&mut self) {
let _ = self.child.start_kill();
if let Some(group) = &self.own_group {
let _ = group.terminate_all();
}
let _ = self.child.wait().await;
}
#[cfg(feature = "cancellation")]
fn cancelled_error(&self) -> Option<Error> {
match &self.cancel_token {
Some(token) if token.is_cancelled() => Some(Error::Cancelled {
program: self.program.clone(),
}),
_ => None,
}
}
pub fn start_kill(&mut self) -> Result<()> {
self.child.start_kill()?;
Ok(())
}
}
impl Drop for RunningProcess {
fn drop(&mut self) {
if let Some(task) = self.stdin_task.take() {
task.abort();
}
if let Some(task) = self.deadline_task.take() {
task.abort();
}
#[cfg(feature = "cancellation")]
if let Some(task) = self.cancel_task.take() {
task.abort();
}
}
}
async fn join_pumps(tasks: Vec<JoinHandle<()>>) {
if tasks.is_empty() {
return;
}
let aborts: Vec<_> = tasks.iter().map(|t| t.abort_handle()).collect();
let join = async {
for task in tasks {
let _ = task.await;
}
};
if tokio::time::timeout(PUMP_TEARDOWN, join).await.is_err() {
for abort in aborts {
abort.abort();
}
}
}