use nix::sys::signal::{self, Signal};
use nix::sys::wait::{self, WaitPidFlag, WaitStatus};
use nix::unistd::Pid;
use serde::{Deserialize, Serialize};
use std::io::prelude::*;
use std::io::{PipeReader, PipeWriter};
use std::thread;
use std::{fmt, str};
use tempfile::TempDir;
use crate::{Command, ProcPidSmapsRollup, ProcPidStatus, Rusage, error::*};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ExitStatus {
pub code: i32,
pub reason: String,
pub exit_code: Option<i32>,
pub rusage: Option<Rusage>,
pub proc_pid_smaps_rollup: Option<ProcPidSmapsRollup>,
pub proc_pid_status: Option<ProcPidStatus>,
}
impl ExitStatus {
pub(crate) const SUCCESS: i32 = 0;
pub(crate) const FAILURE: i32 = 125;
pub(crate) fn new_failure(reason: &str) -> Self {
Self {
code: Self::FAILURE,
reason: reason.to_string(),
exit_code: None,
rusage: None,
proc_pid_smaps_rollup: None,
proc_pid_status: None,
}
}
pub(crate) fn from_wait_status(ws: &WaitStatus, command: &Command) -> Self {
let program = command.get_program();
match *ws {
WaitStatus::Exited(_, status) => Self {
code: status,
reason: format!("process({program}) exited with code {status}"),
exit_code: Some(status),
rusage: None,
proc_pid_smaps_rollup: None,
proc_pid_status: None,
},
WaitStatus::Signaled(_, signal, _) => Self {
code: 128 + signal as i32,
reason: format!("process({program}) received signal {signal}"),
exit_code: None,
rusage: None,
proc_pid_smaps_rollup: None,
proc_pid_status: None,
},
_ => {
unreachable!("ExitStatus::from_wait_status");
}
}
}
pub fn success(&self) -> bool {
self.code == Self::SUCCESS
}
}
pub struct Output {
pub status: ExitStatus,
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl fmt::Debug for Output {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let stdout_utf8 = str::from_utf8(&self.stdout);
let stdout_debug: &dyn fmt::Debug = match stdout_utf8 {
Ok(ref str) => str,
Err(_) => &self.stdout,
};
let stderr_utf8 = str::from_utf8(&self.stderr);
let stderr_debug: &dyn fmt::Debug = match stderr_utf8 {
Ok(ref str) => str,
Err(_) => &self.stderr,
};
fmt.debug_struct("Output")
.field("status", &self.status)
.field("stdout", stdout_debug)
.field("stderr", stderr_debug)
.finish()
}
}
pub struct Child {
pid: Pid,
status: Option<ExitStatus>,
status_reader: Option<PipeReader>,
status_reader_noleading: bool,
tmpdir: Option<TempDir>,
#[cfg(feature = "cgroups")]
cgroup: Option<crate::cgroups::Manager>,
pub stdin: Option<PipeWriter>,
pub stdout: Option<PipeReader>,
pub stderr: Option<PipeReader>,
}
impl Child {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
pid: Pid,
stdin: Option<PipeWriter>,
stdout: Option<PipeReader>,
stderr: Option<PipeReader>,
status_reader: PipeReader,
status_reader_noleading: bool,
status: Option<ExitStatus>,
tmpdir: Option<TempDir>,
#[cfg(feature = "cgroups")] cgroup: Option<crate::cgroups::Manager>,
) -> Self {
Self {
pid,
stdin,
stdout,
stderr,
status_reader: Some(status_reader),
status_reader_noleading,
status,
tmpdir,
#[cfg(feature = "cgroups")]
cgroup,
}
}
pub fn id(&self) -> u32 {
self.pid.as_raw() as u32
}
pub fn kill(&mut self) -> Result<()> {
if self.status.is_some() {
return Ok(());
}
_ = signal::kill(self.pid, Signal::SIGKILL);
Ok(())
}
pub fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
if let Some(status) = &self.status {
return Ok(Some(status.clone()));
}
let flags = Some(WaitPidFlag::WNOHANG);
let ws = wait::waitpid(self.pid, flags).map_err(ProcessErrorKind::NixError)?;
if let WaitStatus::StillAlive = ws {
Ok(None)
} else {
Ok(Some(self.retrieve_exit_status(ws)?))
}
}
pub fn wait(&mut self) -> Result<ExitStatus> {
drop(self.stdin.take());
if let Some(status) = &self.status {
return Ok(status.clone());
}
let flags = None;
let ws = wait::waitpid(self.pid, flags).map_err(ProcessErrorKind::NixError)?;
self.retrieve_exit_status(ws)
}
fn retrieve_exit_status(&mut self, ws: WaitStatus) -> Result<ExitStatus> {
if let WaitStatus::Signaled(_, Signal::SIGKILL, _) = ws {
let reason = "container received signal SIGKILL";
self.status = Some(ExitStatus::new_failure(reason));
}
if self.status.is_none() {
self.retrieve_exit_status_internal_process()?;
}
self.logging();
drop(self.tmpdir.take());
#[cfg(feature = "cgroups")]
drop(self.cgroup.take());
let s = self.status.clone();
s.ok_or(Error::ProcessError(ProcessErrorKind::ChildExitStatusGone))
}
fn retrieve_exit_status_internal_process(&mut self) -> Result<()> {
if let Some(mut reader) = self.status_reader.take() {
if !self.status_reader_noleading {
let mut request = [0];
reader
.read_exact(&mut request)
.map_err(ProcessErrorKind::StdIoError)?;
}
let mut encoded = vec![];
reader
.read_to_end(&mut encoded)
.map_err(ProcessErrorKind::StdIoError)?;
drop(reader);
let status =
postcard::from_bytes(&encoded[..]).map_err(ProcessErrorKind::PostcardError)?;
self.status = Some(status);
}
Ok(())
}
fn logging(&self) {
if !log::log_enabled!(log::Level::Debug) {
return;
}
if let Some(status) = &self.status {
log::debug!("================================");
log::debug!("Exited: {}", status.reason);
if let Some(r) = &status.rusage {
let (rt, ut, st) = (r.real_time, r.user_time, r.system_time);
log::debug!("Metric: RealTime: {:>12} sec", rt.as_secs_f64());
log::debug!("Metric: UserTime: {:>12} sec", ut.as_secs_f64());
log::debug!("Metric: SysTime: {:>12} sec", st.as_secs_f64());
}
if let Some(r) = &status.proc_pid_smaps_rollup {
log::debug!("Metric: Rss: {:>12} kB", r.rss);
log::debug!("Metric: Shared_Dirty: {:>12} kB", r.shared_dirty);
log::debug!("Metric: Shared_Clean: {:>12} kB", r.shared_clean);
log::debug!("Metric: Private_Dirty: {:>12} kB", r.private_dirty);
log::debug!("Metric: Private_Clean: {:>12} kB", r.private_clean);
log::debug!("Metric: Pss: {:>12} kB", r.pss);
log::debug!("Metric: Pss_Dirty: {:>12} kB", r.pss_dirty);
log::debug!("Metric: Pss_Anon: {:>12} kB", r.pss_anon);
log::debug!("Metric: Pss_File: {:>12} kB", r.pss_file);
log::debug!("Metric: Pss_Shmem: {:>12} kB", r.pss_shmem);
}
if let Some(r) = &status.proc_pid_status {
log::debug!("Metric: VmPeak: {:>12} kB", r.vmpeak);
log::debug!("Metric: VmSize: {:>12} kB", r.vmsize);
log::debug!("Metric: VmHWM: {:>12} kB", r.vmhwm);
log::debug!("Metric: VmRSS: {:>12} kB", r.vmrss);
log::debug!("Metric: VmData: {:>12} kB", r.vmdata);
log::debug!("Metric: VmStk: {:>12} kB", r.vmstk);
log::debug!("Metric: VmExe: {:>12} kB", r.vmexe);
log::debug!("Metric: VmLib: {:>12} kB", r.vmlib);
log::debug!("Metric: VmPTE: {:>12} kB", r.vmpte);
log::debug!("Metric: VmSwap: {:>12} kB", r.vmswap);
log::debug!("Metric: RssAnon: {:>12} kB", r.rssanon);
log::debug!("Metric: RssFile: {:>12} kB", r.rssfile);
log::debug!("Metric: RssShmem: {:>12} kB", r.rssshmem);
}
} else {
log::debug!("================================");
log::debug!("Exited: NULL");
}
}
pub fn wait_with_output(&mut self) -> Result<Output> {
drop(self.stdin.take());
let (mut stdout, mut stderr) = (vec![], vec![]);
match (self.stdout.take(), self.stderr.take()) {
(None, None) => {}
(Some(mut out), None) => {
out.read_to_end(&mut stdout)
.map(drop)
.map_err(ProcessErrorKind::StdIoError)?;
}
(None, Some(mut err)) => {
err.read_to_end(&mut stderr)
.map(drop)
.map_err(ProcessErrorKind::StdIoError)?;
}
(Some(mut out), Some(mut err)) => {
self.read2(&mut out, &mut stdout, &mut err, &mut stderr)?;
}
}
let status = self.wait()?;
Ok(Output {
status,
stdout,
stderr,
})
}
fn read2(
&mut self,
out: &mut PipeReader,
stdout: &mut Vec<u8>,
err: &mut PipeReader,
stderr: &mut Vec<u8>,
) -> Result<()> {
thread::scope(|s| {
let throut = s.spawn(move || out.read_to_end(stdout).map(drop));
let threrr = s.spawn(move || err.read_to_end(stderr).map(drop));
let r = throut.join();
match r {
Err(_) => return Err(ProcessErrorKind::StdThreadPanic),
Ok(Err(e)) => return Err(ProcessErrorKind::StdIoError(e)),
Ok(Ok(_)) => {}
}
let r = threrr.join();
match r {
Err(_) => Err(ProcessErrorKind::StdThreadPanic),
Ok(r) => r.map_err(ProcessErrorKind::StdIoError),
}
})?;
Ok(())
}
}