use std::path::PathBuf;
use tokio::fs;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::SandboxError;
#[derive(Debug, Clone)]
pub struct CgroupStats {
pub cpu_usage_ns: u64,
pub memory_current_bytes: u64,
}
#[derive(Debug)]
pub struct CgroupV2Manager {
base_path: PathBuf,
}
impl CgroupV2Manager {
pub async fn is_available() -> bool {
fs::metadata("/sys/fs/cgroup/cgroup.controllers")
.await
.is_ok()
}
pub async fn discover_cgroup_parent() -> Result<PathBuf, SandboxError> {
let contents = fs::read_to_string("/proc/self/cgroup")
.await
.map_err(|e| SandboxError::CgroupParseFailed(format!("read /proc/self/cgroup: {e}")))?;
let cgroup_rel = contents
.lines()
.find_map(|line| {
let mut parts = line.splitn(3, ':');
let hier = parts.next()?;
let _ = parts.next(); let path = parts.next()?;
if hier == "0" {
Some(path.trim().to_string())
} else {
None
}
})
.ok_or_else(|| {
SandboxError::CgroupParseFailed(
"no unified hierarchy (0::) entry in /proc/self/cgroup".into(),
)
})?;
let process_cgroup =
PathBuf::from("/sys/fs/cgroup").join(cgroup_rel.trim_start_matches('/'));
let parent = process_cgroup
.parent()
.unwrap_or(&process_cgroup)
.to_path_buf();
debug!(?process_cgroup, ?parent, "discovered process cgroup");
Ok(parent)
}
pub async fn new(
agent_id: Uuid,
resources: Option<&synwire_core::agents::sandbox::ResourceLimits>,
) -> Result<Self, SandboxError> {
let cgroup_parent = Self::discover_cgroup_parent().await?;
let synwire_root = cgroup_parent.join("synwire");
let agents_root = synwire_root.join("agents");
let base_path = agents_root.join(agent_id.to_string());
let parent_control = cgroup_parent.join("cgroup.subtree_control");
let _ = fs::write(&parent_control, "+cpu +memory +pids").await;
if !synwire_root.exists() {
fs::create_dir_all(&synwire_root)
.await
.map_err(SandboxError::CgroupIo)?;
}
let synwire_control = synwire_root.join("cgroup.subtree_control");
let _ = fs::write(&synwire_control, "+cpu +memory +pids").await;
let test_path = agents_root.join("_write_test");
match fs::create_dir_all(&test_path).await {
Ok(()) => {
let _ = fs::remove_dir(&test_path).await;
}
Err(_) => {
return Err(SandboxError::CgroupNotWritable {
path: agents_root.display().to_string(),
});
}
}
fs::create_dir_all(&base_path)
.await
.map_err(SandboxError::CgroupIo)?;
let mgr = Self { base_path };
if let Some(limits) = resources {
mgr.apply_limits(limits).await?;
}
Ok(mgr)
}
#[must_use]
pub fn base_path(&self) -> &std::path::Path {
&self.base_path
}
pub async fn move_pid(&self, pid: u32) -> Result<(), SandboxError> {
let procs_path = self.base_path.join("cgroup.procs");
fs::write(&procs_path, pid.to_string())
.await
.map_err(SandboxError::CgroupIo)
}
pub async fn read_stats(&self) -> Option<CgroupStats> {
let cpu_ns = read_cpu_usage_ns(&self.base_path).await;
let memory_bytes = read_memory_current(&self.base_path).await;
match (cpu_ns, memory_bytes) {
(Some(cpu_usage_ns), Some(memory_current_bytes)) => Some(CgroupStats {
cpu_usage_ns,
memory_current_bytes,
}),
_ => None,
}
}
pub async fn kill_all(&self) -> Result<(), SandboxError> {
let kill_path = self.base_path.join("cgroup.kill");
if fs::write(&kill_path, "1").await.is_ok() {
return Ok(());
}
let procs_path = self.base_path.join("cgroup.procs");
let contents = fs::read_to_string(&procs_path)
.await
.map_err(SandboxError::CgroupIo)?;
for line in contents.lines() {
let Ok(pid_raw) = line.trim().parse::<i32>() else {
continue;
};
let pid = nix::unistd::Pid::from_raw(pid_raw);
let _ = nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGKILL);
}
Ok(())
}
pub async fn destroy(&self) {
if let Err(e) = fs::remove_dir(&self.base_path).await {
warn!(path = %self.base_path.display(), error = %e, "failed to remove agent cgroup");
}
}
async fn apply_limits(
&self,
limits: &synwire_core::agents::sandbox::ResourceLimits,
) -> Result<(), SandboxError> {
if let Some(mem_bytes) = limits.memory_bytes {
fs::write(self.base_path.join("memory.max"), mem_bytes.to_string())
.await
.map_err(SandboxError::CgroupIo)?;
}
if let Some(cpu_quota) = limits.cpu_quota {
let period_us = 100_000u64;
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
let quota_us = (f64::from(cpu_quota) * period_us as f64) as u64;
let content = format!("{quota_us} {period_us}");
fs::write(self.base_path.join("cpu.max"), content)
.await
.map_err(SandboxError::CgroupIo)?;
}
if let Some(max_pids) = limits.max_pids {
fs::write(self.base_path.join("pids.max"), max_pids.to_string())
.await
.map_err(SandboxError::CgroupIo)?;
}
Ok(())
}
}
impl Drop for CgroupV2Manager {
fn drop(&mut self) {
let kill_path = self.base_path.join("cgroup.kill");
if std::fs::write(&kill_path, "1").is_err() {
if let Ok(contents) = std::fs::read_to_string(self.base_path.join("cgroup.procs")) {
for line in contents.lines() {
if let Ok(pid_raw) = line.trim().parse::<i32>() {
let pid = nix::unistd::Pid::from_raw(pid_raw);
let _ = nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGKILL);
}
}
}
}
if let Err(e) = std::fs::remove_dir(&self.base_path) {
warn!(path = %self.base_path.display(), error = %e, "failed to remove agent cgroup on drop");
}
}
}
async fn read_cpu_usage_ns(base: &std::path::Path) -> Option<u64> {
let content = fs::read_to_string(base.join("cpu.stat")).await.ok()?;
content.lines().find_map(|line| {
let mut parts = line.splitn(2, ' ');
if parts.next()? == "usage_usec" {
parts.next()?.trim().parse::<u64>().ok().map(|us| us * 1000)
} else {
None
}
})
}
async fn read_memory_current(base: &std::path::Path) -> Option<u64> {
let content = fs::read_to_string(base.join("memory.current")).await.ok()?;
content.trim().parse::<u64>().ok()
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
fn parse_process_cgroup(content: &str) -> PathBuf {
let rel = content
.lines()
.find_map(|line| {
let mut parts = line.splitn(3, ':');
let hier = parts.next()?;
let _ = parts.next();
let path = parts.next()?;
if hier == "0" {
Some(path.trim().to_string())
} else {
None
}
})
.unwrap();
PathBuf::from("/sys/fs/cgroup").join(rel.trim_start_matches('/'))
}
#[test]
fn parse_cgroup_line_unified_hierarchy() {
let content = "12:cpuset:/\n0::/user.slice/user-1000.slice/user@1000.service/app.slice\n";
let process_cgroup = parse_process_cgroup(content);
assert_eq!(
process_cgroup,
PathBuf::from("/sys/fs/cgroup/user.slice/user-1000.slice/user@1000.service/app.slice")
);
}
#[test]
fn cgroup_parent_is_one_level_up() {
let content = "0::/user.slice/user-1000.slice/user@1000.service/app.slice/code.scope\n";
let process_cgroup = parse_process_cgroup(content);
let parent = process_cgroup
.parent()
.unwrap_or(&process_cgroup)
.to_path_buf();
assert_eq!(
parent,
PathBuf::from("/sys/fs/cgroup/user.slice/user-1000.slice/user@1000.service/app.slice")
);
let agent_cgroup = parent.join("synwire").join("agents").join("test-uuid");
assert_eq!(
agent_cgroup,
PathBuf::from(
"/sys/fs/cgroup/user.slice/user-1000.slice/user@1000.service/app.slice/synwire/agents/test-uuid"
)
);
}
#[test]
fn cgroup_parent_fallback_at_root_level() {
let content = "0::/\n";
let process_cgroup = parse_process_cgroup(content);
let parent = process_cgroup
.parent()
.unwrap_or(&process_cgroup)
.to_path_buf();
assert!(parent.starts_with("/sys"));
}
}