use std::io;
use std::time::Duration;
use tokio::process::{Child, Command};
#[cfg(feature = "stats")]
use windows_sys::Win32::Foundation::FILETIME;
use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE};
#[cfg(feature = "process-control")]
use windows_sys::Win32::Foundation::{ERROR_INVALID_PARAMETER, ERROR_MORE_DATA};
use windows_sys::Win32::System::Diagnostics::ToolHelp::{
CreateToolhelp32Snapshot, TH32CS_SNAPTHREAD, THREADENTRY32, Thread32First, Thread32Next,
};
#[cfg(any(feature = "process-control", feature = "stats"))]
use windows_sys::Win32::System::JobObjects::QueryInformationJobObject;
use windows_sys::Win32::System::JobObjects::{
AssignProcessToJobObject, CreateJobObjectW, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JobObjectExtendedLimitInformation,
SetInformationJobObject, TerminateJobObject,
};
#[cfg(feature = "limits")]
use windows_sys::Win32::System::JobObjects::{
JOB_OBJECT_CPU_RATE_CONTROL_ENABLE, JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP,
JOB_OBJECT_LIMIT_ACTIVE_PROCESS, JOB_OBJECT_LIMIT_JOB_MEMORY,
JOBOBJECT_CPU_RATE_CONTROL_INFORMATION, JobObjectCpuRateControlInformation,
};
#[cfg(feature = "stats")]
use windows_sys::Win32::System::JobObjects::{
JOBOBJECT_BASIC_ACCOUNTING_INFORMATION, JobObjectBasicAccountingInformation,
};
#[cfg(feature = "process-control")]
use windows_sys::Win32::System::JobObjects::{
JOBOBJECT_BASIC_PROCESS_ID_LIST, JobObjectBasicProcessIdList,
};
#[cfg(feature = "stats")]
use windows_sys::Win32::System::ProcessStatus::{K32GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
use windows_sys::Win32::System::Threading::{
CREATE_SUSPENDED, OpenThread, ResumeThread, THREAD_SUSPEND_RESUME,
};
#[cfg(feature = "process-control")]
use windows_sys::Win32::System::Threading::{GetExitCodeProcess, SuspendThread};
#[cfg(feature = "stats")]
use windows_sys::Win32::System::Threading::{
GetProcessTimes, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
};
use crate::Mechanism;
#[cfg(feature = "process-control")]
use crate::Signal;
#[cfg(feature = "limits")]
use crate::limits::ResourceLimits;
#[cfg(feature = "stats")]
use crate::stats::ProcessGroupStats;
#[cfg(feature = "stats")]
use crate::sys::ProcMetrics;
pub(crate) struct Job {
handle: HANDLE,
suspend_lock: std::sync::Mutex<()>,
skip_drop_kill: super::SkipDropKill,
}
unsafe impl Send for Job {}
unsafe impl Sync for Job {}
impl Job {
pub(crate) fn new(#[cfg(feature = "limits")] limits: &ResourceLimits) -> io::Result<Self> {
let handle = unsafe { CreateJobObjectW(std::ptr::null(), std::ptr::null()) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let job = Job {
handle,
suspend_lock: std::sync::Mutex::new(()),
skip_drop_kill: super::SkipDropKill::new(),
};
let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { std::mem::zeroed() };
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
#[cfg(feature = "limits")]
{
if let Some(bytes) = limits.memory_max {
info.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
info.JobMemoryLimit = usize::try_from(bytes).unwrap_or(usize::MAX);
}
if let Some(n) = limits.max_processes {
info.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_ACTIVE_PROCESS;
info.BasicLimitInformation.ActiveProcessLimit = n;
}
}
let ok = unsafe {
SetInformationJobObject(
job.handle,
JobObjectExtendedLimitInformation,
std::ptr::from_ref(&info).cast(),
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
#[cfg(feature = "limits")]
if let Some(cores) = limits.cpu_quota {
let cpus = std::thread::available_parallelism().map_or(1.0, |n| n.get() as f64);
let rate = cpu_hard_cap_rate(cores, cpus);
let mut cpu: JOBOBJECT_CPU_RATE_CONTROL_INFORMATION = unsafe { std::mem::zeroed() };
cpu.ControlFlags =
JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
cpu.Anonymous.CpuRate = rate;
let ok = unsafe {
SetInformationJobObject(
job.handle,
JobObjectCpuRateControlInformation,
std::ptr::from_ref(&cpu).cast(),
std::mem::size_of::<JOBOBJECT_CPU_RATE_CONTROL_INFORMATION>() as u32,
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
}
Ok(job)
}
pub(crate) fn spawn(
&self,
cmd: &mut Command,
opts: &crate::sys::SpawnOptions,
) -> io::Result<Child> {
use std::os::windows::process::CommandExt;
cmd.as_std_mut()
.creation_flags(CREATE_SUSPENDED | opts.creation_flags);
let guard = UncontainedChildGuard::arm(cmd.spawn()?);
let pid = guard.child().id().ok_or_else(|| {
io::Error::other("child exited before it could be assigned to the job")
})?;
let handle = guard.child().raw_handle().ok_or_else(|| {
io::Error::other("child exited before it could be assigned to the job")
})?;
let _suspend_guard = self
.suspend_lock
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let ok = unsafe { AssignProcessToJobObject(self.handle, handle as HANDLE) };
if ok == 0 {
return Err(io::Error::last_os_error());
}
resume_process_threads(pid)?;
Ok(guard.disarm())
}
#[cfg(feature = "process-control")]
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let handle = child
.raw_handle()
.ok_or_else(|| io::Error::other("child has no handle (already exited?)"))?;
let ok = unsafe { AssignProcessToJobObject(self.handle, handle as HANDLE) };
if ok == 0 {
let err = io::Error::last_os_error();
if process_has_exited(handle as HANDLE) {
return Ok(());
}
return Err(err);
}
Ok(())
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
let ok = unsafe { TerminateJobObject(self.handle, 1) };
if ok == 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn signal(&self, sig: Signal) -> io::Result<()> {
match sig {
Signal::Kill => self.kill_all(),
other => Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("signal({other:?})"),
)),
}
}
#[cfg(feature = "process-control")]
pub(crate) fn suspend(&self) -> io::Result<()> {
self.for_each_member_thread(true)
}
#[cfg(feature = "process-control")]
pub(crate) fn resume(&self) -> io::Result<()> {
self.for_each_member_thread(false)
}
#[cfg(feature = "process-control")]
pub(crate) fn members(&self) -> io::Result<Vec<u32>> {
job_member_pids(self.handle)
}
#[cfg(feature = "process-control")]
fn for_each_member_thread(&self, suspend: bool) -> io::Result<()> {
let _guard = self
.suspend_lock
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let members: std::collections::HashSet<u32> =
job_member_pids(self.handle)?.into_iter().collect();
if members.is_empty() {
return Ok(());
}
let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0) };
if snapshot == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
let mut entry: THREADENTRY32 = unsafe { std::mem::zeroed() };
entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
let mut last_err = None;
let mut ok = unsafe { Thread32First(snapshot, &mut entry) };
while ok != 0 {
if members.contains(&entry.th32OwnerProcessID)
&& let Err(err) = suspend_or_resume_thread(entry.th32ThreadID, suspend)
{
last_err = Some(err);
}
ok = unsafe { Thread32Next(snapshot, &mut entry) };
}
unsafe { CloseHandle(snapshot) };
match last_err {
Some(err) => Err(err),
None => Ok(()),
}
}
pub(crate) async fn graceful_shutdown(
&self,
_signal: i32,
_timeout: Duration,
escalate: bool,
) -> io::Result<()> {
if escalate {
self.kill_all()
} else {
self.skip_drop_kill.request();
Ok(())
}
}
#[cfg(feature = "stats")]
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
let mut acct: JOBOBJECT_BASIC_ACCOUNTING_INFORMATION = unsafe { std::mem::zeroed() };
let ok = unsafe {
QueryInformationJobObject(
self.handle,
JobObjectBasicAccountingInformation,
std::ptr::from_mut(&mut acct).cast(),
std::mem::size_of::<JOBOBJECT_BASIC_ACCOUNTING_INFORMATION>() as u32,
std::ptr::null_mut(),
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
let mut ext: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { std::mem::zeroed() };
let ok = unsafe {
QueryInformationJobObject(
self.handle,
JobObjectExtendedLimitInformation,
std::ptr::from_mut(&mut ext).cast(),
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
std::ptr::null_mut(),
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
let cpu_100ns = (acct.TotalUserTime as u64).saturating_add(acct.TotalKernelTime as u64);
Ok(ProcessGroupStats {
active_process_count: acct.ActiveProcesses as usize,
total_cpu_time: Some(Duration::from_nanos(cpu_100ns.saturating_mul(100))),
peak_memory_bytes: Some(ext.PeakJobMemoryUsed as u64),
})
}
pub(crate) fn mechanism(&self) -> Mechanism {
Mechanism::JobObject
}
}
#[cfg(feature = "process-control")]
fn process_has_exited(handle: HANDLE) -> bool {
const STILL_ACTIVE: u32 = 259;
let mut code: u32 = 0;
let ok = unsafe { GetExitCodeProcess(handle, &mut code) };
ok != 0 && code != STILL_ACTIVE
}
struct UncontainedChildGuard {
child: Option<Child>,
}
impl UncontainedChildGuard {
fn arm(child: Child) -> Self {
Self { child: Some(child) }
}
fn child(&self) -> &Child {
self.child
.as_ref()
.expect("the guarded child is present until disarm")
}
fn disarm(mut self) -> Child {
self.child
.take()
.expect("the guarded child is taken exactly once")
}
}
impl Drop for UncontainedChildGuard {
fn drop(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = child.start_kill();
}
}
}
fn resume_process_threads(pid: u32) -> io::Result<()> {
let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0) };
if snapshot == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
let mut entry: THREADENTRY32 = unsafe { std::mem::zeroed() };
entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
let mut resumed = 0u32;
let mut last_err = None;
let mut ok = unsafe { Thread32First(snapshot, &mut entry) };
while ok != 0 {
if entry.th32OwnerProcessID == pid {
match resume_thread(entry.th32ThreadID) {
Ok(()) => resumed += 1,
Err(err) => last_err = Some(err),
}
}
ok = unsafe { Thread32Next(snapshot, &mut entry) };
}
unsafe { CloseHandle(snapshot) };
if resumed == 0 {
return Err(last_err
.unwrap_or_else(|| io::Error::other("no thread found to resume the contained child")));
}
Ok(())
}
fn resume_thread(tid: u32) -> io::Result<()> {
let thread = unsafe { OpenThread(THREAD_SUSPEND_RESUME, 0, tid) };
if thread.is_null() {
return Err(io::Error::last_os_error());
}
let err = loop {
let prev = unsafe { ResumeThread(thread) };
if prev == u32::MAX {
break Some(io::Error::last_os_error());
}
if prev <= 1 {
break None; }
};
unsafe { CloseHandle(thread) };
match err {
Some(err) => Err(err),
None => Ok(()),
}
}
#[cfg(feature = "process-control")]
fn suspend_or_resume_thread(tid: u32, suspend: bool) -> io::Result<()> {
let thread = unsafe { OpenThread(THREAD_SUSPEND_RESUME, 0, tid) };
if thread.is_null() {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(ERROR_INVALID_PARAMETER as i32) {
return Ok(());
}
return Err(err);
}
let prev = unsafe {
if suspend {
SuspendThread(thread)
} else {
ResumeThread(thread)
}
};
let err = (prev == u32::MAX).then(io::Error::last_os_error);
unsafe { CloseHandle(thread) };
match err {
Some(err) => Err(err),
None => Ok(()),
}
}
#[cfg(feature = "process-control")]
fn job_member_pids(handle: HANDLE) -> io::Result<Vec<u32>> {
let mut cap: usize = 64;
loop {
let bytes = std::mem::size_of::<JOBOBJECT_BASIC_PROCESS_ID_LIST>()
+ cap.saturating_sub(1) * std::mem::size_of::<usize>();
let mut buf = vec![0u64; bytes.div_ceil(std::mem::size_of::<u64>())];
let ok = unsafe {
QueryInformationJobObject(
handle,
JobObjectBasicProcessIdList,
buf.as_mut_ptr().cast(),
bytes as u32,
std::ptr::null_mut(),
)
};
let list = buf.as_ptr().cast::<JOBOBJECT_BASIC_PROCESS_ID_LIST>();
if ok == 0 {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(ERROR_MORE_DATA as i32) {
let assigned = unsafe { (*list).NumberOfAssignedProcesses } as usize;
cap = assigned.max(cap).saturating_mul(2);
continue;
}
return Err(err);
}
let n = unsafe { (*list).NumberOfProcessIdsInList } as usize;
let ids =
unsafe { std::slice::from_raw_parts(std::ptr::addr_of!((*list).ProcessIdList[0]), n) };
return Ok(ids.iter().map(|&pid| pid as u32).collect());
}
}
#[cfg(feature = "stats")]
fn filetime_nanos(ft: FILETIME) -> u64 {
let units = ((ft.dwHighDateTime as u64) << 32) | ft.dwLowDateTime as u64;
units.saturating_mul(100)
}
#[cfg(feature = "limits")]
fn cpu_hard_cap_rate(cores: f64, cpus: f64) -> u32 {
let rate = ((cores / cpus) * 10_000.0).round();
rate.clamp(1.0, 10_000.0) as u32
}
#[cfg(feature = "stats")]
pub(crate) fn process_metrics(pid: u32) -> ProcMetrics {
let mut metrics = ProcMetrics::default();
let handle = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) };
if handle.is_null() {
return metrics;
}
let mut creation = FILETIME {
dwLowDateTime: 0,
dwHighDateTime: 0,
};
let mut exit = creation;
let mut kernel = creation;
let mut user = creation;
let ok = unsafe { GetProcessTimes(handle, &mut creation, &mut exit, &mut kernel, &mut user) };
if ok != 0 {
metrics.cpu_time = Some(Duration::from_nanos(
filetime_nanos(kernel).saturating_add(filetime_nanos(user)),
));
}
let mut counters: PROCESS_MEMORY_COUNTERS = unsafe { std::mem::zeroed() };
counters.cb = std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32;
let ok = unsafe { K32GetProcessMemoryInfo(handle, &mut counters, counters.cb) };
if ok != 0 {
metrics.peak_memory_bytes = Some(counters.PeakWorkingSetSize as u64);
}
unsafe { CloseHandle(handle) };
metrics
}
impl Drop for Job {
fn drop(&mut self) {
if self.skip_drop_kill.is_set() {
let info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { std::mem::zeroed() };
let _ = unsafe {
SetInformationJobObject(
self.handle,
JobObjectExtendedLimitInformation,
std::ptr::from_ref(&info).cast(),
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
)
};
#[cfg(feature = "limits")]
{
let cpu: JOBOBJECT_CPU_RATE_CONTROL_INFORMATION = unsafe { std::mem::zeroed() };
let _ = unsafe {
SetInformationJobObject(
self.handle,
JobObjectCpuRateControlInformation,
std::ptr::from_ref(&cpu).cast(),
std::mem::size_of::<JOBOBJECT_CPU_RATE_CONTROL_INFORMATION>() as u32,
)
};
}
}
unsafe { CloseHandle(self.handle) };
}
}
#[cfg(all(test, feature = "process-control"))]
mod thread_tests {
use super::suspend_or_resume_thread;
#[test]
fn suspend_or_resume_a_stale_tid_is_ok() {
assert!(suspend_or_resume_thread(1, true).is_ok());
assert!(suspend_or_resume_thread(1, false).is_ok());
}
}
#[cfg(test)]
mod guard_tests {
fn pid_alive(pid: u32) -> bool {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{
GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
};
const STILL_ACTIVE: u32 = 259;
let handle = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) };
if handle.is_null() {
return false; }
let mut code: u32 = 0;
let ok = unsafe { GetExitCodeProcess(handle, &mut code) };
unsafe { CloseHandle(handle) };
ok != 0 && code == STILL_ACTIVE
}
fn spawn_suspended() -> tokio::process::Child {
use windows_sys::Win32::System::Threading::CREATE_SUSPENDED;
tokio::process::Command::new("cmd")
.args(["/C", "ping -n 30 127.0.0.1 > NUL"])
.creation_flags(CREATE_SUSPENDED)
.spawn()
.expect("spawn the suspended child")
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn uncontained_guard_reaps_the_child_on_an_armed_drop() {
let child = spawn_suspended();
let pid = child.id().expect("the child has a pid");
assert!(
pid_alive(pid),
"the suspended child is alive right after spawn"
);
drop(super::UncontainedChildGuard::arm(child)); let mut dead = false;
for _ in 0..200 {
if !pid_alive(pid) {
dead = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert!(dead, "an armed guard drop must terminate the child");
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn uncontained_guard_disarm_hands_back_a_live_child() {
let child = spawn_suspended();
let pid = child.id().expect("the child has a pid");
let mut kept = super::UncontainedChildGuard::arm(child).disarm();
assert!(pid_alive(pid), "disarm must leave the child running");
let _ = kept.start_kill();
let _ = kept.wait().await;
}
}
#[cfg(all(test, feature = "limits"))]
mod tests {
use super::cpu_hard_cap_rate;
#[test]
fn cpu_rate_maps_per_core_fraction_to_total_system_percent() {
assert_eq!(cpu_hard_cap_rate(0.5, 8.0), 625);
assert_eq!(cpu_hard_cap_rate(1.0, 1.0), 10_000);
assert_eq!(cpu_hard_cap_rate(4.0, 4.0), 10_000);
assert_eq!(cpu_hard_cap_rate(8.0, 4.0), 10_000);
assert_eq!(cpu_hard_cap_rate(0.0001, 64.0), 1);
}
}