use std::io;
use std::time::Duration;
use tokio::process::{Child, Command};
#[cfg(feature = "process-control")]
use windows_sys::Win32::Foundation::ERROR_MORE_DATA;
#[cfg(feature = "stats")]
use windows_sys::Win32::Foundation::FILETIME;
use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE};
use windows_sys::Win32::System::Diagnostics::ToolHelp::{
CreateToolhelp32Snapshot, TH32CS_SNAPTHREAD, THREADENTRY32, Thread32First, Thread32Next,
};
#[cfg(any(feature = "process-control", feature = "stats"))]
use windows_sys::Win32::System::JobObjects::QueryInformationJobObject;
use windows_sys::Win32::System::JobObjects::{
AssignProcessToJobObject, CreateJobObjectW, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JobObjectExtendedLimitInformation,
SetInformationJobObject, TerminateJobObject,
};
#[cfg(feature = "limits")]
use windows_sys::Win32::System::JobObjects::{
JOB_OBJECT_CPU_RATE_CONTROL_ENABLE, JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP,
JOB_OBJECT_LIMIT_ACTIVE_PROCESS, JOB_OBJECT_LIMIT_JOB_MEMORY,
JOBOBJECT_CPU_RATE_CONTROL_INFORMATION, JobObjectCpuRateControlInformation,
};
#[cfg(feature = "stats")]
use windows_sys::Win32::System::JobObjects::{
JOBOBJECT_BASIC_ACCOUNTING_INFORMATION, JobObjectBasicAccountingInformation,
};
#[cfg(feature = "process-control")]
use windows_sys::Win32::System::JobObjects::{
JOBOBJECT_BASIC_PROCESS_ID_LIST, JobObjectBasicProcessIdList,
};
#[cfg(feature = "stats")]
use windows_sys::Win32::System::ProcessStatus::{K32GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
#[cfg(feature = "process-control")]
use windows_sys::Win32::System::Threading::SuspendThread;
use windows_sys::Win32::System::Threading::{
CREATE_SUSPENDED, OpenThread, ResumeThread, THREAD_SUSPEND_RESUME,
};
#[cfg(feature = "stats")]
use windows_sys::Win32::System::Threading::{
GetProcessTimes, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
};
use crate::Mechanism;
#[cfg(feature = "process-control")]
use crate::Signal;
#[cfg(feature = "limits")]
use crate::limits::ResourceLimits;
#[cfg(feature = "stats")]
use crate::stats::ProcessGroupStats;
#[cfg(feature = "stats")]
use crate::sys::ProcMetrics;
pub(crate) struct Job {
handle: HANDLE,
suspend_lock: std::sync::Mutex<()>,
}
unsafe impl Send for Job {}
unsafe impl Sync for Job {}
impl Job {
pub(crate) fn new(#[cfg(feature = "limits")] limits: &ResourceLimits) -> io::Result<Self> {
let handle = unsafe { CreateJobObjectW(std::ptr::null(), std::ptr::null()) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let job = Job {
handle,
suspend_lock: std::sync::Mutex::new(()),
};
let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { std::mem::zeroed() };
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
#[cfg(feature = "limits")]
{
if let Some(bytes) = limits.memory_max {
info.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
info.JobMemoryLimit = usize::try_from(bytes).unwrap_or(usize::MAX);
}
if let Some(n) = limits.max_processes {
info.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_ACTIVE_PROCESS;
info.BasicLimitInformation.ActiveProcessLimit = n;
}
}
let ok = unsafe {
SetInformationJobObject(
job.handle,
JobObjectExtendedLimitInformation,
std::ptr::from_ref(&info).cast(),
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
#[cfg(feature = "limits")]
if let Some(cores) = limits.cpu_quota {
let cpus = std::thread::available_parallelism().map_or(1.0, |n| n.get() as f64);
let rate = cpu_hard_cap_rate(cores, cpus);
let mut cpu: JOBOBJECT_CPU_RATE_CONTROL_INFORMATION = unsafe { std::mem::zeroed() };
cpu.ControlFlags =
JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
cpu.Anonymous.CpuRate = rate;
let ok = unsafe {
SetInformationJobObject(
job.handle,
JobObjectCpuRateControlInformation,
std::ptr::from_ref(&cpu).cast(),
std::mem::size_of::<JOBOBJECT_CPU_RATE_CONTROL_INFORMATION>() as u32,
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
}
Ok(job)
}
pub(crate) fn spawn(
&self,
cmd: &mut Command,
opts: &crate::sys::SpawnOptions,
) -> io::Result<Child> {
use std::os::windows::process::CommandExt;
cmd.as_std_mut()
.creation_flags(CREATE_SUSPENDED | opts.creation_flags);
let mut child = cmd.spawn()?;
let pid = child.id().ok_or_else(|| {
io::Error::other("child exited before it could be assigned to the job")
})?;
let handle = child.raw_handle().ok_or_else(|| {
io::Error::other("child exited before it could be assigned to the job")
})?;
let _guard = self
.suspend_lock
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let ok = unsafe { AssignProcessToJobObject(self.handle, handle as HANDLE) };
if ok == 0 {
let err = io::Error::last_os_error();
let _ = child.start_kill();
return Err(err);
}
if let Err(err) = resume_process_threads(pid) {
let _ = child.start_kill();
return Err(err);
}
Ok(child)
}
#[cfg(feature = "process-control")]
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let handle = child
.raw_handle()
.ok_or_else(|| io::Error::other("child has no handle (already exited?)"))?;
let ok = unsafe { AssignProcessToJobObject(self.handle, handle as HANDLE) };
if ok == 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
let ok = unsafe { TerminateJobObject(self.handle, 1) };
if ok == 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn signal(&self, sig: Signal) -> io::Result<()> {
match sig {
Signal::Kill => self.kill_all(),
other => Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("signal({other:?})"),
)),
}
}
#[cfg(feature = "process-control")]
pub(crate) fn suspend(&self) -> io::Result<()> {
self.for_each_member_thread(true)
}
#[cfg(feature = "process-control")]
pub(crate) fn resume(&self) -> io::Result<()> {
self.for_each_member_thread(false)
}
#[cfg(feature = "process-control")]
pub(crate) fn members(&self) -> io::Result<Vec<u32>> {
job_member_pids(self.handle)
}
#[cfg(feature = "process-control")]
fn for_each_member_thread(&self, suspend: bool) -> io::Result<()> {
let _guard = self
.suspend_lock
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let members: std::collections::HashSet<u32> =
job_member_pids(self.handle)?.into_iter().collect();
if members.is_empty() {
return Ok(());
}
let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0) };
if snapshot == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
let mut entry: THREADENTRY32 = unsafe { std::mem::zeroed() };
entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
let mut last_err = None;
let mut ok = unsafe { Thread32First(snapshot, &mut entry) };
while ok != 0 {
if members.contains(&entry.th32OwnerProcessID)
&& let Err(err) = suspend_or_resume_thread(entry.th32ThreadID, suspend)
{
last_err = Some(err);
}
ok = unsafe { Thread32Next(snapshot, &mut entry) };
}
unsafe { CloseHandle(snapshot) };
match last_err {
Some(err) => Err(err),
None => Ok(()),
}
}
pub(crate) async fn graceful_shutdown(
&self,
_signal: i32,
_timeout: Duration,
_escalate: bool,
) -> io::Result<()> {
self.kill_all()
}
#[cfg(feature = "stats")]
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
let mut acct: JOBOBJECT_BASIC_ACCOUNTING_INFORMATION = unsafe { std::mem::zeroed() };
let ok = unsafe {
QueryInformationJobObject(
self.handle,
JobObjectBasicAccountingInformation,
std::ptr::from_mut(&mut acct).cast(),
std::mem::size_of::<JOBOBJECT_BASIC_ACCOUNTING_INFORMATION>() as u32,
std::ptr::null_mut(),
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
let mut ext: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { std::mem::zeroed() };
let ok = unsafe {
QueryInformationJobObject(
self.handle,
JobObjectExtendedLimitInformation,
std::ptr::from_mut(&mut ext).cast(),
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
std::ptr::null_mut(),
)
};
if ok == 0 {
return Err(io::Error::last_os_error());
}
let cpu_100ns = (acct.TotalUserTime as u64).saturating_add(acct.TotalKernelTime as u64);
Ok(ProcessGroupStats {
active_process_count: acct.ActiveProcesses as usize,
total_cpu_time: Some(Duration::from_nanos(cpu_100ns.saturating_mul(100))),
peak_memory_bytes: Some(ext.PeakJobMemoryUsed as u64),
})
}
pub(crate) fn mechanism(&self) -> Mechanism {
Mechanism::JobObject
}
}
fn resume_process_threads(pid: u32) -> io::Result<()> {
let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0) };
if snapshot == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
let mut entry: THREADENTRY32 = unsafe { std::mem::zeroed() };
entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
let mut resumed = 0u32;
let mut last_err = None;
let mut ok = unsafe { Thread32First(snapshot, &mut entry) };
while ok != 0 {
if entry.th32OwnerProcessID == pid {
match resume_thread(entry.th32ThreadID) {
Ok(()) => resumed += 1,
Err(err) => last_err = Some(err),
}
}
ok = unsafe { Thread32Next(snapshot, &mut entry) };
}
unsafe { CloseHandle(snapshot) };
if resumed == 0 {
return Err(last_err
.unwrap_or_else(|| io::Error::other("no thread found to resume the contained child")));
}
Ok(())
}
fn resume_thread(tid: u32) -> io::Result<()> {
let thread = unsafe { OpenThread(THREAD_SUSPEND_RESUME, 0, tid) };
if thread.is_null() {
return Err(io::Error::last_os_error());
}
let prev = unsafe { ResumeThread(thread) };
unsafe { CloseHandle(thread) };
if prev == u32::MAX {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(feature = "process-control")]
fn suspend_or_resume_thread(tid: u32, suspend: bool) -> io::Result<()> {
let thread = unsafe { OpenThread(THREAD_SUSPEND_RESUME, 0, tid) };
if thread.is_null() {
return Err(io::Error::last_os_error());
}
let prev = unsafe {
if suspend {
SuspendThread(thread)
} else {
ResumeThread(thread)
}
};
unsafe { CloseHandle(thread) };
if prev == u32::MAX {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(feature = "process-control")]
fn job_member_pids(handle: HANDLE) -> io::Result<Vec<u32>> {
let mut cap: usize = 64;
loop {
let bytes = std::mem::size_of::<JOBOBJECT_BASIC_PROCESS_ID_LIST>()
+ cap.saturating_sub(1) * std::mem::size_of::<usize>();
let mut buf = vec![0u64; bytes.div_ceil(std::mem::size_of::<u64>())];
let ok = unsafe {
QueryInformationJobObject(
handle,
JobObjectBasicProcessIdList,
buf.as_mut_ptr().cast(),
bytes as u32,
std::ptr::null_mut(),
)
};
let list = buf.as_ptr().cast::<JOBOBJECT_BASIC_PROCESS_ID_LIST>();
if ok == 0 {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(ERROR_MORE_DATA as i32) {
let assigned = unsafe { (*list).NumberOfAssignedProcesses } as usize;
cap = assigned.max(cap).saturating_mul(2);
continue;
}
return Err(err);
}
let n = unsafe { (*list).NumberOfProcessIdsInList } as usize;
let ids = unsafe { std::slice::from_raw_parts((*list).ProcessIdList.as_ptr(), n) };
return Ok(ids.iter().map(|&pid| pid as u32).collect());
}
}
#[cfg(feature = "stats")]
fn filetime_nanos(ft: FILETIME) -> u64 {
let units = ((ft.dwHighDateTime as u64) << 32) | ft.dwLowDateTime as u64;
units.saturating_mul(100)
}
#[cfg(feature = "limits")]
fn cpu_hard_cap_rate(cores: f64, cpus: f64) -> u32 {
let rate = ((cores / cpus) * 10_000.0).round();
rate.clamp(1.0, 10_000.0) as u32
}
#[cfg(feature = "stats")]
pub(crate) fn process_metrics(pid: u32) -> ProcMetrics {
let mut metrics = ProcMetrics::default();
let handle = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) };
if handle.is_null() {
return metrics;
}
let mut creation = FILETIME {
dwLowDateTime: 0,
dwHighDateTime: 0,
};
let mut exit = creation;
let mut kernel = creation;
let mut user = creation;
let ok = unsafe { GetProcessTimes(handle, &mut creation, &mut exit, &mut kernel, &mut user) };
if ok != 0 {
metrics.cpu_time = Some(Duration::from_nanos(
filetime_nanos(kernel) + filetime_nanos(user),
));
}
let mut counters: PROCESS_MEMORY_COUNTERS = unsafe { std::mem::zeroed() };
counters.cb = std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32;
let ok = unsafe { K32GetProcessMemoryInfo(handle, &mut counters, counters.cb) };
if ok != 0 {
metrics.peak_memory_bytes = Some(counters.PeakWorkingSetSize as u64);
}
unsafe { CloseHandle(handle) };
metrics
}
impl Drop for Job {
fn drop(&mut self) {
unsafe { CloseHandle(self.handle) };
}
}
#[cfg(all(test, feature = "limits"))]
mod tests {
use super::cpu_hard_cap_rate;
#[test]
fn cpu_rate_maps_per_core_fraction_to_total_system_percent() {
assert_eq!(cpu_hard_cap_rate(0.5, 8.0), 625);
assert_eq!(cpu_hard_cap_rate(1.0, 1.0), 10_000);
assert_eq!(cpu_hard_cap_rate(4.0, 4.0), 10_000);
assert_eq!(cpu_hard_cap_rate(8.0, 4.0), 10_000);
assert_eq!(cpu_hard_cap_rate(0.0001, 64.0), 1);
}
}