use std::time::Duration;
use tokio::process::{Child, Command};
use crate::error::{Error, Result};
#[cfg(feature = "limits")]
use crate::limits::ResourceLimits;
use crate::mechanism::Mechanism;
#[cfg(feature = "process-control")]
use crate::signal::Signal;
#[cfg(feature = "stats")]
use crate::stats::ProcessGroupStats;
use crate::sys::Job;
#[cfg_attr(
feature = "limits",
doc = "",
doc = "[`limits`](Self::limits) caps the whole tree's memory, process count, and CPU;",
doc = "it is applied at group creation and only where a real container exists (Windows",
doc = "Job Object or Linux cgroup v2) — see [`ResourceLimits`]."
)]
#[derive(Debug, Clone)]
pub struct ProcessGroupOptions {
pub shutdown_timeout: Duration,
pub escalate_to_kill: bool,
#[cfg(feature = "limits")]
pub limits: ResourceLimits,
}
impl Default for ProcessGroupOptions {
fn default() -> Self {
Self {
shutdown_timeout: Duration::from_secs(2),
escalate_to_kill: true,
#[cfg(feature = "limits")]
limits: ResourceLimits::default(),
}
}
}
#[cfg(feature = "limits")]
impl ProcessGroupOptions {
#[must_use]
pub fn memory_max(mut self, bytes: u64) -> Self {
self.limits.memory_max = Some(bytes);
self
}
#[must_use]
pub fn max_processes(mut self, n: u32) -> Self {
self.limits.max_processes = Some(n);
self
}
#[must_use]
pub fn cpu_quota(mut self, cores: f64) -> Self {
self.limits.cpu_quota = Some(cores);
self
}
}
pub struct ProcessGroup {
job: Job,
options: ProcessGroupOptions,
}
impl std::fmt::Debug for ProcessGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProcessGroup")
.field("mechanism", &self.mechanism())
.field("options", &self.options)
.finish_non_exhaustive()
}
}
impl ProcessGroup {
pub fn new() -> Result<Self> {
Self::with_options(ProcessGroupOptions::default())
}
#[cfg_attr(
feature = "limits",
doc = "",
doc = "If `options.limits` sets any cap, it is enforced now. When the active",
doc = "mechanism can't honor a requested limit (no cgroup/Job Object, or a Linux",
doc = "cgroup without controller delegation) this returns",
doc = "[`Error::ResourceLimit`] rather than handing back an unbounded group."
)]
pub fn with_options(options: ProcessGroupOptions) -> Result<Self> {
#[cfg(feature = "limits")]
let job = {
validate_limits(&options.limits)?;
Job::new(&options.limits).map_err(|source| {
if options.limits.any() {
Error::ResourceLimit(source.to_string())
} else {
Error::Io(source)
}
})?
};
#[cfg(not(feature = "limits"))]
let job = Job::new()?;
Ok(Self { job, options })
}
pub fn spawn(&self, cmd: &mut Command) -> Result<Child> {
self.spawn_with_options(cmd, &crate::sys::SpawnOptions::default())
}
pub(crate) fn spawn_with_options(
&self,
cmd: &mut Command,
opts: &crate::sys::SpawnOptions,
) -> Result<Child> {
let child = self.job.spawn(cmd, opts).map_err(|source| Error::Spawn {
program: program_name(cmd),
source,
})?;
Ok(child)
}
#[cfg(feature = "process-control")]
pub fn adopt(&self, child: &Child) -> Result<()> {
self.job.adopt(child)?;
#[cfg(feature = "tracing")]
tracing::trace!(
target: "processkit",
mechanism = ?self.mechanism(),
pid = ?child.id(),
"adopted an externally spawned child"
);
Ok(())
}
pub fn terminate_all(&self) -> Result<()> {
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
mechanism = ?self.mechanism(),
"terminating every process in the group"
);
self.job.kill_all()?;
Ok(())
}
#[cfg(feature = "process-control")]
pub fn signal(&self, sig: Signal) -> Result<()> {
self.job
.signal(sig)
.map_err(|source| map_unsupported(source, format!("signal({sig:?})")))
}
#[cfg(feature = "process-control")]
pub fn suspend(&self) -> Result<()> {
self.job
.suspend()
.map_err(|source| map_unsupported(source, "suspend"))
}
#[cfg(feature = "process-control")]
pub fn resume(&self) -> Result<()> {
self.job
.resume()
.map_err(|source| map_unsupported(source, "resume"))
}
#[cfg(feature = "process-control")]
pub fn members(&self) -> Result<Vec<u32>> {
let pids = self.job.members()?;
Ok(pids)
}
pub async fn shutdown(self) -> Result<()> {
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
mechanism = ?self.mechanism(),
timeout_ms = self.options.shutdown_timeout.as_millis() as u64,
escalate = self.options.escalate_to_kill,
"graceful shutdown: TERM, grace, then KILL"
);
self.job
.graceful_shutdown(self.options.shutdown_timeout, self.options.escalate_to_kill)
.await?;
Ok(())
}
#[cfg(feature = "stats")]
pub fn stats(&self) -> Result<ProcessGroupStats> {
let stats = self.job.stats()?;
Ok(stats)
}
#[cfg(feature = "stats")]
pub fn sample_stats(&self, every: Duration) -> crate::stats::StatsSampler<'_> {
crate::stats::StatsSampler::new(self, every)
}
pub fn mechanism(&self) -> Mechanism {
self.job.mechanism()
}
}
fn program_name(cmd: &Command) -> String {
cmd.as_std().get_program().to_string_lossy().into_owned()
}
#[cfg(feature = "process-control")]
fn map_unsupported(source: std::io::Error, operation: impl Into<String>) -> Error {
if source.kind() == std::io::ErrorKind::Unsupported {
Error::Unsupported {
operation: operation.into(),
}
} else {
Error::Io(source)
}
}
#[cfg(feature = "limits")]
fn validate_limits(limits: &ResourceLimits) -> Result<()> {
if limits.memory_max == Some(0) {
return Err(Error::ResourceLimit(
"memory_max must be greater than 0".into(),
));
}
if limits.max_processes == Some(0) {
return Err(Error::ResourceLimit(
"max_processes must be greater than 0".into(),
));
}
if let Some(cores) = limits.cpu_quota
&& !(cores.is_finite() && cores > 0.0)
{
return Err(Error::ResourceLimit(
"cpu_quota must be a finite value greater than 0".into(),
));
}
Ok(())
}
#[cfg(all(test, feature = "limits"))]
mod tests {
use super::*;
#[test]
fn builders_set_limits() {
let opts = ProcessGroupOptions::default()
.memory_max(1024)
.max_processes(8)
.cpu_quota(0.5);
assert_eq!(opts.limits.memory_max, Some(1024));
assert_eq!(opts.limits.max_processes, Some(8));
assert_eq!(opts.limits.cpu_quota, Some(0.5));
assert!(opts.limits.any());
}
#[test]
fn default_options_have_no_limits() {
let opts = ProcessGroupOptions::default();
assert!(!opts.limits.any());
}
#[test]
fn validate_rejects_nonsense() {
for opts in [
ProcessGroupOptions::default().memory_max(0),
ProcessGroupOptions::default().max_processes(0),
ProcessGroupOptions::default().cpu_quota(0.0),
ProcessGroupOptions::default().cpu_quota(-1.0),
ProcessGroupOptions::default().cpu_quota(f64::NAN),
ProcessGroupOptions::default().cpu_quota(f64::INFINITY),
] {
assert!(matches!(
validate_limits(&opts.limits),
Err(Error::ResourceLimit(_))
));
assert!(matches!(
ProcessGroup::with_options(opts),
Err(Error::ResourceLimit(_))
));
}
}
}