use std::ffi::{CStr, CString};
use std::io;
use std::os::unix::ffi::OsStringExt;
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::time::{Instant, sleep};
use crate::Mechanism;
use crate::stats::ProcessGroupStats;
use crate::sys::ProcMetrics;
use crate::sys::pgroup::ProcessGroup;
const POLL_INTERVAL: Duration = Duration::from_millis(20);
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
pub(crate) struct Job {
backend: Backend,
}
enum Backend {
Cgroup(Cgroup),
ProcessGroup(ProcessGroup),
}
impl Job {
pub(crate) fn new() -> io::Result<Self> {
let backend = match Cgroup::create() {
Ok(cg) => Backend::Cgroup(cg),
Err(_) => Backend::ProcessGroup(ProcessGroup::new()),
};
Ok(Job { backend })
}
pub(crate) fn spawn(&self, cmd: &mut Command) -> io::Result<Child> {
match &self.backend {
Backend::Cgroup(cg) => {
let procs = CString::new(cg.path.join("cgroup.procs").into_os_string().into_vec())
.map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "cgroup path contains NUL")
})?;
unsafe {
cmd.as_std_mut()
.pre_exec(move || write_self_pid(procs.as_c_str()));
}
cmd.spawn()
}
Backend::ProcessGroup(pg) => pg.spawn(cmd),
}
}
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let pid = child
.id()
.ok_or_else(|| io::Error::other("child has no pid (already exited?)"))?
as i32;
match &self.backend {
Backend::Cgroup(cg) => {
std::fs::write(cg.path.join("cgroup.procs"), pid.to_string().as_bytes())
}
Backend::ProcessGroup(pg) => pg.adopt(child),
}
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
match &self.backend {
Backend::Cgroup(cg) => cg.kill(),
Backend::ProcessGroup(pg) => pg.kill_all(),
}
}
pub(crate) async fn graceful_shutdown(
&self,
timeout: Duration,
escalate: bool,
) -> io::Result<()> {
match &self.backend {
Backend::Cgroup(cg) => {
cg.signal(libc::SIGTERM);
let deadline = Instant::now() + timeout;
while !cg.is_empty() {
if Instant::now() >= deadline {
break;
}
sleep(POLL_INTERVAL).await;
}
if escalate && !cg.is_empty() {
cg.kill()?;
}
Ok(())
}
Backend::ProcessGroup(pg) => pg.graceful_shutdown(timeout, escalate).await,
}
}
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
match &self.backend {
Backend::Cgroup(cg) => {
let pids = cg.members();
let active = pids.len();
let mut cpu = Duration::ZERO;
let mut have_cpu = false;
let mut mem = 0u64;
let mut have_mem = false;
for pid in pids {
let m = process_metrics(pid as u32);
if let Some(c) = m.cpu_time {
cpu += c;
have_cpu = true;
}
if let Some(p) = m.peak_memory_bytes {
mem += p;
have_mem = true;
}
}
Ok(ProcessGroupStats {
active_process_count: active,
total_cpu_time: have_cpu.then_some(cpu),
peak_memory_bytes: have_mem.then_some(mem),
})
}
Backend::ProcessGroup(pg) => pg.stats(),
}
}
pub(crate) fn mechanism(&self) -> Mechanism {
match &self.backend {
Backend::Cgroup(_) => Mechanism::CgroupV2,
Backend::ProcessGroup(_) => Mechanism::ProcessGroup,
}
}
}
pub(crate) fn process_metrics(pid: u32) -> ProcMetrics {
let mut metrics = ProcMetrics::default();
if let Ok(stat) = std::fs::read_to_string(format!("/proc/{pid}/stat"))
&& let Some(idx) = stat.rfind(')')
{
let fields: Vec<&str> = stat[idx + 1..].split_whitespace().collect();
if fields.len() > 12
&& let (Ok(utime), Ok(stime)) = (fields[11].parse::<u64>(), fields[12].parse::<u64>())
{
let hz = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if hz > 0 {
let nanos = (utime + stime) as u128 * 1_000_000_000u128 / hz as u128;
metrics.cpu_time = Some(Duration::from_nanos(nanos as u64));
}
}
}
if let Ok(status) = std::fs::read_to_string(format!("/proc/{pid}/status")) {
for line in status.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
if let Some(kb) = rest
.split_whitespace()
.next()
.and_then(|s| s.parse::<u64>().ok())
{
metrics.peak_memory_bytes = Some(kb * 1024);
}
break;
}
}
}
metrics
}
impl Drop for Job {
fn drop(&mut self) {
match &self.backend {
Backend::Cgroup(cg) => {
let _ = cg.kill();
for _ in 0..50 {
if cg.is_empty() {
break;
}
std::thread::sleep(Duration::from_millis(2));
}
let _ = std::fs::remove_dir(&cg.path);
}
Backend::ProcessGroup(_) => {}
}
}
}
struct Cgroup {
path: PathBuf,
}
impl Cgroup {
fn create() -> io::Result<Self> {
let root = Path::new("/sys/fs/cgroup");
if !root.join("cgroup.controllers").exists() {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"cgroup v2 not mounted",
));
}
let self_cgroup = std::fs::read_to_string("/proc/self/cgroup")?;
let rel = self_cgroup
.lines()
.find_map(|line| line.strip_prefix("0::"))
.unwrap_or("/")
.trim();
let parent = root.join(rel.trim_start_matches('/'));
let name = format!(
"processkit-{}-{}",
std::process::id(),
NEXT_ID.fetch_add(1, Ordering::Relaxed)
);
let path = parent.join(name);
std::fs::create_dir(&path)?;
Ok(Cgroup { path })
}
fn members(&self) -> Vec<i32> {
match std::fs::read_to_string(self.path.join("cgroup.procs")) {
Ok(procs) => procs
.lines()
.filter_map(|l| l.trim().parse::<i32>().ok())
.collect(),
Err(_) => Vec::new(),
}
}
fn is_empty(&self) -> bool {
self.members().is_empty()
}
fn signal(&self, sig: i32) {
for pid in self.members() {
unsafe {
libc::kill(pid, sig);
}
}
}
fn kill(&self) -> io::Result<()> {
if std::fs::write(self.path.join("cgroup.kill"), b"1").is_ok() {
return Ok(());
}
for _ in 0..50 {
let members = self.members();
if members.is_empty() {
break;
}
for pid in members {
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
std::thread::sleep(Duration::from_millis(2));
}
Ok(())
}
}
fn write_self_pid(path: &CStr) -> io::Result<()> {
unsafe {
let fd = libc::open(path.as_ptr(), libc::O_WRONLY | libc::O_CLOEXEC);
if fd < 0 {
return Err(io::Error::last_os_error());
}
let mut buf = [0u8; 12];
let mut i = buf.len();
let mut v = libc::getpid() as u32;
loop {
i -= 1;
buf[i] = b'0' + (v % 10) as u8;
v /= 10;
if v == 0 {
break;
}
}
let bytes = &buf[i..];
let written = libc::write(fd, bytes.as_ptr().cast(), bytes.len());
let werr = io::Error::last_os_error();
libc::close(fd);
if written < 0 {
return Err(werr);
}
Ok(())
}
}