use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{ContainerId, ContainerState, Runtime};
use crate::MacSandboxConfig;
use std::collections::HashMap;
use std::fmt::Write;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{RegistryAuth, ServiceSpec};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GpuAccess {
None,
MetalCompute,
MpsOnly,
}
#[derive(Debug, Clone)]
pub enum NetworkAccess {
None,
LocalhostOnly {
bind_ports: Vec<u16>,
connect_ports: Vec<u16>,
},
Full,
}
#[derive(Debug, Clone)]
pub struct SandboxConfig {
pub rootfs_dir: PathBuf,
pub workspace_dir: PathBuf,
pub gpu_access: GpuAccess,
pub network_access: NetworkAccess,
pub writable_dirs: Vec<PathBuf>,
pub readonly_dirs: Vec<PathBuf>,
pub max_files: u64,
pub cpu_time_limit: Option<u64>,
pub memory_limit: Option<u64>,
}
const METAL_COMPUTE_PROFILE_SECTION: &str = "\
; --- GPU: Full Metal Compute ---
; IOKit user clients for GPU hardware access
; Apple Silicon (M1/M2/M3/M4/M5): AGXDeviceUserClient is the actual user client
; class opened by Metal. AGXSharedUserClient handles multi-process GPU sharing.
; The IOAccel* classes are IOKit compatibility shims (still needed).
(allow iokit-open
(iokit-user-client-class \"AGXDeviceUserClient\")
(iokit-user-client-class \"AGXSharedUserClient\")
(iokit-user-client-class \"IOSurfaceRootUserClient\")
(iokit-user-client-class \"IOSurfaceAcceleratorClient\")
(iokit-user-client-class \"IOAccelDevice\")
(iokit-user-client-class \"IOAccelDevice2\")
(iokit-user-client-class \"IOAccelContext\")
(iokit-user-client-class \"IOAccelContext2\")
(iokit-user-client-class \"IOAccelSharedUserClient\")
(iokit-user-client-class \"IOAccelSharedUserClient2\")
(iokit-user-client-class \"IOAccelSubmitter2\")
(iokit-user-client-class \"RootDomainUserClient\"))
; IOKit service-level access (macOS 26+ fine-grained syntax)
; AGXAcceleratorG* prefix matches all Apple Silicon GPU generations.
(allow iokit-open-service
(iokit-user-client-class \"IOSurfaceRoot\")
(iokit-registry-entry-class-prefix \"AGXAcceleratorG\"))
; IOKit user-client-level access (macOS 26+ fine-grained syntax)
(allow iokit-open-user-client
(iokit-user-client-class \"AGXDeviceUserClient\")
(iokit-user-client-class \"AGXSharedUserClient\")
(iokit-user-client-class \"IOSurfaceRootUserClient\")
(iokit-user-client-class \"IOSurfaceAcceleratorClient\"))
; GPU IOKit properties (comprehensive set from Apple's safety-inference profile)
(allow iokit-get-properties
(iokit-property \"AGCInfo\")
(iokit-property \"AGXCliqueTracingDefaults\")
(iokit-property \"AGXInternalPerfCounterResourcesPath\")
(iokit-property \"AGXLimitersDirName\")
(iokit-property \"AGXParameterBufferMaxSize\")
(iokit-property \"AGXParameterBufferMaxSizeEverMemless\")
(iokit-property \"AGXParameterBufferMaxSizeNeverMemless\")
(iokit-property \"AGXTraceCodeVersion\")
(iokit-property \"CFBundleIdentifier\")
(iokit-property \"CFBundleIdentifierKernel\")
(iokit-property \"chip-id\")
(iokit-property \"CommandSubmissionEnabled\")
(iokit-property \"CompactVRAM\")
(iokit-property \"EnableBlitLib\")
(iokit-property \"gpu-core-count\")
(iokit-property \"GPUConfigurationVariable\")
(iokit-property \"GPUDCCDisplayable\")
(iokit-property \"GPUDebugNullClientMask\")
(iokit-property \"GpuDebugPolicy\")
(iokit-property \"GPURawCounterBundleName\")
(iokit-property \"GPURawCounterPluginClassName\")
(iokit-property \"IOClass\")
(iokit-property \"IOClassNameOverride\")
(iokit-property \"IOGeneralInterest\")
(iokit-property \"IOGLBundleName\")
(iokit-property \"IOGLESBundleName\")
(iokit-property \"IOGLESDefaultUseMetal\")
(iokit-property \"IOGLESMetalBundleName\")
(iokit-property \"IOMatchCategory\")
(iokit-property \"IOMatchedAtBoot\")
(iokit-property \"IONameMatch\")
(iokit-property \"IONameMatched\")
(iokit-property \"IOPCIMatch\")
(iokit-property \"IOPersonalityPublisher\")
(iokit-property \"IOPowerManagement\")
(iokit-property \"IOProbeScore\")
(iokit-property \"IOProviderClass\")
(iokit-property \"IORegistryEntryPropertyKeys\")
(iokit-property \"IOReportLegend\")
(iokit-property \"IOReportLegendPublic\")
(iokit-property \"IOSourceVersion\")
(iokit-property \"KDebugVersion\")
(iokit-property \"MetalCoalesce\")
(iokit-property \"MetalPluginClassName\")
(iokit-property \"MetalPluginName\")
(iokit-property \"MetalStatisticsName\")
(iokit-property \"MetalStatisticsScriptName\")
(iokit-property \"model\")
(iokit-property \"PerformanceStatistics\")
(iokit-property \"Removable\")
(iokit-property \"SafeEjectRequested\")
(iokit-property \"SchedulerState\")
(iokit-property \"SCMBuildTime\")
(iokit-property \"SCMVersionNumber\")
(iokit-property \"soc-generation\")
(iokit-property \"SurfaceList\")
(iokit-property \"vendor-id\")
(iokit-property \"device-id\")
(iokit-property \"class-code\"))
; Mach services for Metal shader compilation and GPU memory
(allow mach-lookup
(global-name \"com.apple.MTLCompilerService\")
(global-name \"com.apple.CARenderServer\")
(global-name \"com.apple.PowerManagement.control\")
(global-name \"com.apple.gpu.process\")
(global-name \"com.apple.gpumemd.source\")
(global-name \"com.apple.cvmsServ\"))
; XPC services for shader compilation (Apple Silicon)
(allow mach-lookup
(xpc-service-name \"com.apple.MTLCompilerService\")
(xpc-service-name-prefix \"com.apple.AGXCompilerService\"))
; User preferences for Metal/OpenGL
(allow user-preference-read
(preference-domain \"com.apple.opengl\")
(preference-domain \"com.apple.Metal\")
(preference-domain \"com.nvidia.OpenGL\"))
; GPU driver bundles and libraries
(allow file-read*
(subpath \"/Library/GPUBundles\")
(subpath \"/System/Library/Frameworks/Metal.framework\")
(subpath \"/System/Library/Frameworks/MetalPerformanceShaders.framework\")
(subpath \"/System/Library/Frameworks/MetalPerformanceShadersGraph.framework\")
(subpath \"/System/Library/PrivateFrameworks/GPUCompiler.framework\"))
";
const MPS_ONLY_PROFILE_SECTION: &str = "\
; --- GPU: MPS Only (pre-compiled kernels, no shader compilation) ---
; IOKit user clients for GPU hardware access (minimal set)
; AGXDeviceUserClient is required -- MTLCreateSystemDefaultDevice() opens this class.
(allow iokit-open
(iokit-user-client-class \"AGXDeviceUserClient\")
(iokit-user-client-class \"AGXSharedUserClient\")
(iokit-user-client-class \"IOSurfaceRootUserClient\")
(iokit-user-client-class \"IOAccelDevice2\")
(iokit-user-client-class \"IOAccelContext2\")
(iokit-user-client-class \"IOAccelSharedUserClient2\")
(iokit-user-client-class \"RootDomainUserClient\"))
; IOKit service-level access (macOS 26+ fine-grained syntax)
(allow iokit-open-service
(iokit-user-client-class \"IOSurfaceRoot\")
(iokit-registry-entry-class-prefix \"AGXAcceleratorG\"))
; IOKit user-client-level access (macOS 26+ fine-grained syntax)
(allow iokit-open-user-client
(iokit-user-client-class \"AGXDeviceUserClient\")
(iokit-user-client-class \"IOSurfaceRootUserClient\"))
; GPU IOKit properties (minimal set for MPS)
(allow iokit-get-properties
(iokit-property \"MetalPluginClassName\")
(iokit-property \"MetalPluginName\")
(iokit-property \"IOClass\")
(iokit-property \"IOGLESDefaultUseMetal\")
(iokit-property \"IORegistryEntryPropertyKeys\")
(iokit-property \"IOSourceVersion\")
(iokit-property \"GPUConfigurationVariable\")
(iokit-property \"GPURawCounterBundleName\")
(iokit-property \"gpu-core-count\")
(iokit-property \"model\")
(iokit-property \"vendor-id\")
(iokit-property \"device-id\")
(iokit-property \"soc-generation\"))
; Mach services for MPS
; MTLCompilerService is required because MPSGraph on macOS 26+ uses JIT
; compilation internally for kernel fusion, even for pre-compiled MPS kernels.
; gpumemd.source is needed for GPU memory management.
(allow mach-lookup
(global-name \"com.apple.MTLCompilerService\")
(global-name \"com.apple.PowerManagement.control\")
(global-name \"com.apple.gpumemd.source\"))
; XPC service for MTLCompilerService (required by MPSGraph JIT)
(allow mach-lookup
(xpc-service-name \"com.apple.MTLCompilerService\"))
; User preferences
(allow user-preference-read
(preference-domain \"com.apple.Metal\"))
; MPS framework access
(allow file-read*
(subpath \"/Library/GPUBundles\")
(subpath \"/System/Library/Frameworks/Metal.framework\")
(subpath \"/System/Library/Frameworks/MetalPerformanceShaders.framework\")
(subpath \"/System/Library/Frameworks/MetalPerformanceShadersGraph.framework\"))
";
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn generate_sandbox_profile(config: &SandboxConfig) -> String {
let mut profile = String::with_capacity(4096);
profile.push_str("(version 1)\n");
profile.push_str("(deny default)\n");
profile.push('\n');
profile.push_str("; --- Base process rules ---\n");
profile.push_str("(allow process-exec)\n");
profile.push_str("(allow process-fork)\n");
profile.push_str("(allow signal (target same-sandbox))\n");
profile.push_str("(allow process-info* (target self))\n");
profile.push_str("(allow process-info-pidinfo)\n");
profile.push_str("(allow process-info-rusage)\n");
profile.push('\n');
profile.push_str("; --- System libraries (required for any process to run) ---\n");
profile.push_str("(allow file-read*\n");
profile.push_str(" (subpath \"/usr/lib\")\n");
profile.push_str(" (subpath \"/System/Library/Frameworks\")\n");
profile.push_str(" (subpath \"/System/Library/PrivateFrameworks\")\n");
profile.push_str(" (subpath \"/System/Library/Extensions\")\n");
profile.push_str(" (subpath \"/System/Library/ColorSync\")\n");
profile.push_str(" (literal \"/\")\n");
profile.push_str(" (literal \"/dev/random\")\n");
profile.push_str(" (literal \"/dev/urandom\"))\n");
profile.push('\n');
profile.push_str("; --- Executable mapping (required for dyld) ---\n");
profile.push_str("(allow file-map-executable\n");
profile.push_str(" (subpath \"/usr/lib\")\n");
profile.push_str(" (subpath \"/System/Library/Frameworks\")\n");
profile.push_str(" (subpath \"/System/Library/PrivateFrameworks\")\n");
profile.push_str(" (subpath \"/System/Library/Extensions\"))\n");
profile.push('\n');
profile.push_str("; --- System info (hw detection, etc.) ---\n");
profile.push_str("(allow sysctl-read)\n");
profile.push_str("(allow system-info)\n");
profile.push('\n');
profile.push_str("; --- Mach basics ---\n");
profile.push_str("(allow mach-lookup\n");
profile.push_str(" (global-name \"com.apple.system.opendirectoryd.libinfo\"))\n");
profile.push('\n');
profile.push_str("; --- Container rootfs ---\n");
let _ = writeln!(
profile,
"(allow file-read* file-write* (subpath \"{}\"))",
config.rootfs_dir.display()
);
let _ = writeln!(
profile,
"(allow file-map-executable (subpath \"{}\"))",
config.rootfs_dir.display()
);
profile.push('\n');
profile.push_str("; --- Workspace directory ---\n");
let _ = writeln!(
profile,
"(allow file-read* file-write* (subpath \"{}\"))",
config.workspace_dir.display()
);
profile.push('\n');
if !config.writable_dirs.is_empty() {
profile.push_str("; --- Volume mounts (writable) ---\n");
for dir in &config.writable_dirs {
let _ = writeln!(
profile,
"(allow file-read* file-write* (subpath \"{}\"))",
dir.display()
);
}
profile.push('\n');
}
if !config.readonly_dirs.is_empty() {
profile.push_str("; --- Volume mounts (read-only) ---\n");
for dir in &config.readonly_dirs {
let _ = writeln!(
profile,
"(allow file-read* (subpath \"{}\"))",
dir.display()
);
}
profile.push('\n');
}
match config.gpu_access {
GpuAccess::MetalCompute => {
profile.push_str(METAL_COMPUTE_PROFILE_SECTION);
}
GpuAccess::MpsOnly => {
profile.push_str(MPS_ONLY_PROFILE_SECTION);
}
GpuAccess::None => {}
}
match &config.network_access {
NetworkAccess::None => {
profile.push_str("; --- Network: DENIED ---\n\n");
}
NetworkAccess::LocalhostOnly {
bind_ports,
connect_ports,
} => {
profile.push_str("; --- Network: localhost only ---\n");
for port in bind_ports {
let _ = writeln!(
profile,
"(allow network-bind (local ip \"localhost:{port}\"))",
);
}
for port in connect_ports {
let _ = writeln!(
profile,
"(allow network-outbound (remote ip \"localhost:{port}\"))",
);
}
if !bind_ports.is_empty() {
profile.push_str("(allow network-inbound (local ip \"localhost:*\"))\n");
}
profile.push('\n');
}
NetworkAccess::Full => {
profile.push_str("; --- Network: full access ---\n");
profile.push_str("(allow network-outbound)\n");
profile.push_str("(allow network-inbound)\n");
profile.push_str("(allow network-bind)\n");
profile.push_str("(allow system-socket)\n");
profile.push('\n');
}
}
profile.push_str("; --- I/O essentials ---\n");
profile.push_str("(allow file-write-data\n");
profile.push_str(" (require-all (literal \"/dev/null\") (vnode-type CHARACTER-DEVICE)))\n");
profile.push_str("(allow file-read-data\n");
profile.push_str(" (require-all (literal \"/dev/null\") (vnode-type CHARACTER-DEVICE)))\n");
profile.push_str("(allow pseudo-tty)\n");
profile.push_str("(allow file-read* file-write* file-ioctl (literal \"/dev/ptmx\"))\n");
profile.push('\n');
profile.push_str("; --- IPC ---\n");
profile.push_str("(allow ipc-posix-sem)\n");
profile.push_str("(allow ipc-posix-shm)\n");
profile.push('\n');
profile
}
mod seatbelt_ffi {
use std::os::raw::c_char;
#[link(name = "System", kind = "dylib")]
extern "C" {
pub fn sandbox_init(profile: *const c_char, flags: u64, errorbuf: *mut *mut c_char) -> i32;
pub fn sandbox_free_error(errorbuf: *mut c_char);
}
}
#[allow(unsafe_code)]
fn apply_seatbelt_profile(sbpl: &str) -> std::io::Result<()> {
use std::ffi::CString;
use std::ptr;
let profile_cstr =
CString::new(sbpl).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
let mut error_buf: *mut std::os::raw::c_char = ptr::null_mut();
let result = unsafe {
seatbelt_ffi::sandbox_init(
profile_cstr.as_ptr(),
0, &raw mut error_buf,
)
};
if result != 0 {
let error_msg = if error_buf.is_null() {
format!("sandbox_init returned error code {result}")
} else {
let msg = unsafe {
std::ffi::CStr::from_ptr(error_buf)
.to_string_lossy()
.into_owned()
};
unsafe { seatbelt_ffi::sandbox_free_error(error_buf) };
msg
};
return Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
format!("Failed to initialize sandbox: {error_msg}"),
));
}
Ok(())
}
extern "C" {
fn clonefile(
src: *const libc::c_char,
dst: *const libc::c_char,
flags: libc::c_int,
) -> libc::c_int;
}
#[allow(unsafe_code)]
fn clone_file_apfs(src: &Path, dst: &Path) -> std::io::Result<bool> {
use std::ffi::CString;
let c_src = CString::new(src.to_str().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid src path")
})?)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
let c_dst = CString::new(dst.to_str().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid dst path")
})?)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
let ret = unsafe { clonefile(c_src.as_ptr(), c_dst.as_ptr(), 0) };
if ret == 0 {
Ok(true) } else {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ENOTSUP)
|| err.raw_os_error() == Some(libc::EXDEV)
|| err.raw_os_error() == Some(libc::EEXIST)
{
Ok(false) } else {
Err(err)
}
}
}
async fn clone_directory_recursive(src: &Path, dst: &Path) -> std::io::Result<()> {
tokio::fs::create_dir_all(dst).await?;
let mut entries = tokio::fs::read_dir(src).await?;
while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path();
let file_name = entry.file_name();
let dest_path = dst.join(&file_name);
let file_type = entry.file_type().await?;
if file_type.is_dir() {
Box::pin(clone_directory_recursive(&entry_path, &dest_path)).await?;
} else if file_type.is_file() {
let src_clone = entry_path.clone();
let dst_clone = dest_path.clone();
let cloned =
tokio::task::spawn_blocking(move || clone_file_apfs(&src_clone, &dst_clone))
.await
.map_err(std::io::Error::other)??;
if !cloned {
tokio::fs::copy(&entry_path, &dest_path).await?;
}
} else if file_type.is_symlink() {
let link_target = tokio::fs::read_link(&entry_path).await?;
tokio::fs::symlink(&link_target, &dest_path).await?;
}
}
let src_meta = tokio::fs::metadata(src).await?;
tokio::fs::set_permissions(dst, src_meta.permissions()).await?;
Ok(())
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
fn get_process_rss(pid: u32) -> std::io::Result<u64> {
#[repr(C)]
#[allow(non_snake_case)]
#[allow(clippy::struct_field_names)]
struct ProcTaskInfo {
pti_virtual_size: u64,
pti_resident_size: u64,
pti_total_user: u64,
pti_total_system: u64,
pti_threads_user: u64,
pti_threads_system: u64,
pti_policy: i32,
pti_faults: i32,
pti_pageins: i32,
pti_cow_faults: i32,
pti_messages_sent: i32,
pti_messages_received: i32,
pti_syscalls_mach: i32,
pti_syscalls_unix: i32,
pti_csw: i32,
pti_threadnum: i32,
pti_numrunning: i32,
pti_priority: i32,
}
extern "C" {
fn proc_pidinfo(
pid: libc::c_int,
flavor: libc::c_int,
arg: u64,
buffer: *mut libc::c_void,
buffersize: libc::c_int,
) -> libc::c_int;
}
const PROC_PIDTASKINFO: libc::c_int = 4;
let mut info: ProcTaskInfo = unsafe { std::mem::zeroed() };
let size = std::mem::size_of::<ProcTaskInfo>() as libc::c_int;
let ret = unsafe {
proc_pidinfo(
pid as libc::c_int,
PROC_PIDTASKINFO,
0,
(&raw mut info).cast::<libc::c_void>(),
size,
)
};
if ret <= 0 {
return Err(std::io::Error::last_os_error());
}
Ok(info.pti_resident_size)
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
fn get_process_stats(pid: u32) -> Result<(u64, u64)> {
#[repr(C)]
#[allow(non_snake_case)]
#[allow(clippy::struct_field_names)]
struct ProcTaskInfo {
pti_virtual_size: u64,
pti_resident_size: u64,
pti_total_user: u64,
pti_total_system: u64,
pti_threads_user: u64,
pti_threads_system: u64,
pti_policy: i32,
pti_faults: i32,
pti_pageins: i32,
pti_cow_faults: i32,
pti_messages_sent: i32,
pti_messages_received: i32,
pti_syscalls_mach: i32,
pti_syscalls_unix: i32,
pti_csw: i32,
pti_threadnum: i32,
pti_numrunning: i32,
pti_priority: i32,
}
extern "C" {
fn proc_pidinfo(
pid: libc::c_int,
flavor: libc::c_int,
arg: u64,
buffer: *mut libc::c_void,
buffersize: libc::c_int,
) -> libc::c_int;
}
const PROC_PIDTASKINFO: libc::c_int = 4;
let mut info: ProcTaskInfo = unsafe { std::mem::zeroed() };
let size = std::mem::size_of::<ProcTaskInfo>() as libc::c_int;
let ret = unsafe {
proc_pidinfo(
pid as libc::c_int,
PROC_PIDTASKINFO,
0,
(&raw mut info).cast::<libc::c_void>(),
size,
)
};
if ret <= 0 {
return Err(AgentError::Internal(format!(
"proc_pidinfo failed for pid {pid}: {}",
std::io::Error::last_os_error()
)));
}
let cpu_usec = (info.pti_total_user + info.pti_total_system) / 1000;
let rss = info.pti_resident_size;
Ok((cpu_usec, rss))
}
#[allow(unsafe_code)]
fn set_resource_limits(max_files: u64, cpu_seconds: Option<u64>) -> std::io::Result<()> {
let file_limit = libc::rlimit {
rlim_cur: max_files,
rlim_max: max_files,
};
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &raw const file_limit) } != 0 {
return Err(std::io::Error::last_os_error());
}
if let Some(seconds) = cpu_seconds {
let cpu_limit = libc::rlimit {
rlim_cur: seconds,
rlim_max: seconds,
};
if unsafe { libc::setrlimit(libc::RLIMIT_CPU, &raw const cpu_limit) } != 0 {
return Err(std::io::Error::last_os_error());
}
}
Ok(())
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_wrap)]
async fn memory_watchdog(pid: u32, limit_bytes: u64) {
let check_interval = Duration::from_secs(2);
loop {
tokio::time::sleep(check_interval).await;
let alive = unsafe { libc::kill(pid as i32, 0) } == 0;
if !alive {
tracing::debug!(pid = pid, "Memory watchdog: process exited");
return;
}
match get_process_rss(pid) {
Ok(rss_bytes) => {
if rss_bytes > limit_bytes {
tracing::warn!(
pid = pid,
rss_mb = rss_bytes / (1024 * 1024),
limit_mb = limit_bytes / (1024 * 1024),
"Memory limit exceeded, sending SIGKILL"
);
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
return;
}
}
Err(e) => {
tracing::debug!(pid = pid, error = %e, "Failed to read process RSS");
}
}
}
}
fn build_network_access(spec: &ServiceSpec) -> NetworkAccess {
let mut bind_ports = Vec::new();
let mut connect_ports = Vec::new();
for endpoint in &spec.endpoints {
bind_ports.push(endpoint.target_port());
}
if bind_ports.is_empty() {
return NetworkAccess::Full;
}
connect_ports.extend_from_slice(&[53, 80, 443]);
connect_ports.extend_from_slice(&bind_ports);
NetworkAccess::LocalhostOnly {
bind_ports,
connect_ports,
}
}
fn build_writable_dirs(spec: &ServiceSpec, container_dir: &Path) -> Vec<PathBuf> {
let mut dirs = vec![
container_dir.join("tmp"), ];
for storage in &spec.storage {
let target = match storage {
zlayer_spec::StorageSpec::Named { target, .. }
| zlayer_spec::StorageSpec::Anonymous { target, .. }
| zlayer_spec::StorageSpec::Bind { target, .. }
| zlayer_spec::StorageSpec::Tmpfs { target, .. }
| zlayer_spec::StorageSpec::S3 { target, .. } => target,
};
dirs.push(PathBuf::from(target));
}
dirs
}
#[must_use]
fn parse_memory_string(s: &str) -> Option<u64> {
let s = s.trim();
if let Some(num) = s.strip_suffix("Gi") {
num.parse::<u64>().ok().map(|v| v * 1024 * 1024 * 1024)
} else if let Some(num) = s.strip_suffix("Mi") {
num.parse::<u64>().ok().map(|v| v * 1024 * 1024)
} else if let Some(num) = s.strip_suffix("Ki") {
num.parse::<u64>().ok().map(|v| v * 1024)
} else {
s.parse::<u64>().ok()
}
}
#[must_use]
fn sanitize_image_name(image: &str) -> String {
image.replace(['/', ':', '@'], "_")
}
fn resolve_entrypoint(spec: &ServiceSpec, rootfs: &Path) -> Result<(String, Vec<String>)> {
let resolve_program = |prog: &str| -> String {
if prog.starts_with('/') {
if std::path::Path::new(prog).exists() {
return prog.to_string();
}
let rootfs_path = rootfs.join(prog.trim_start_matches('/'));
if rootfs_path.exists() {
return rootfs_path.to_string_lossy().into_owned();
}
}
prog.to_string()
};
if let Some(ref entrypoint) = spec.command.entrypoint {
if !entrypoint.is_empty() {
let program = resolve_program(&entrypoint[0]);
let mut args: Vec<String> = entrypoint[1..].to_vec();
if let Some(ref extra_args) = spec.command.args {
args.extend(extra_args.iter().cloned());
}
return Ok((program, args));
}
}
if let Some(ref cmd_args) = spec.command.args {
if !cmd_args.is_empty() {
let program = resolve_program(&cmd_args[0]);
let args = cmd_args[1..].to_vec();
return Ok((program, args));
}
}
for shell in &["/bin/sh", "/bin/bash", "/usr/bin/sh"] {
if std::path::Path::new(shell).exists() {
return Ok(((*shell).to_string(), vec![]));
}
if rootfs.join(shell.trim_start_matches('/')).exists() {
let resolved = rootfs.join(shell.trim_start_matches('/'));
return Ok((resolved.to_string_lossy().into_owned(), vec![]));
}
}
Err(AgentError::InvalidSpec(
"No command specified and no shell found in rootfs".to_string(),
))
}
struct SandboxSpawnParams {
program: String,
args: Vec<String>,
sbpl_profile: String,
rootfs_dir: PathBuf,
stdout_path: PathBuf,
stderr_path: PathBuf,
spec: ServiceSpec,
sandbox_config: SandboxConfig,
assigned_port: u16,
auth_env: Option<(String, String, String)>, }
#[allow(unsafe_code)]
fn spawn_sandboxed_process(params: &SandboxSpawnParams) -> Result<u32> {
use std::os::unix::process::CommandExt;
let profile = params.sbpl_profile.clone();
let max_files = params.sandbox_config.max_files;
let cpu_time_limit = params.sandbox_config.cpu_time_limit;
let mut env_vars: Vec<(String, String)> = params
.spec
.env
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let port_str = params.assigned_port.to_string();
if !env_vars.iter().any(|(k, _)| k == "PORT") {
env_vars.push(("PORT".to_string(), port_str.clone()));
}
env_vars.push(("ZLAYER_PORT".to_string(), port_str));
if let Some((ref api_url, ref token, ref socket)) = params.auth_env {
env_vars.push(("ZLAYER_API_URL".to_string(), api_url.clone()));
env_vars.push(("ZLAYER_TOKEN".to_string(), token.clone()));
env_vars.push(("ZLAYER_SOCKET".to_string(), socket.clone()));
}
let stdout_file =
std::fs::File::create(¶ms.stdout_path).map_err(|e| AgentError::CreateFailed {
id: "sandbox-process".to_string(),
reason: format!("Failed to create stdout log: {e}"),
})?;
let stderr_file =
std::fs::File::create(¶ms.stderr_path).map_err(|e| AgentError::CreateFailed {
id: "sandbox-process".to_string(),
reason: format!("Failed to create stderr log: {e}"),
})?;
let child = unsafe {
std::process::Command::new(¶ms.program)
.args(¶ms.args)
.current_dir(¶ms.rootfs_dir)
.stdout(stdout_file)
.stderr(stderr_file)
.env_clear()
.envs(env_vars)
.pre_exec(move || {
apply_seatbelt_profile(&profile)?;
set_resource_limits(max_files, cpu_time_limit)?;
Ok(())
})
.spawn()
}
.map_err(|e| AgentError::StartFailed {
id: "sandbox-process".to_string(),
reason: format!("Failed to spawn sandboxed process: {e}"),
})?;
Ok(child.id())
}
fn reserve_port() -> std::io::Result<(u16, std::net::TcpListener)> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();
Ok((port, listener))
}
#[derive(Debug)]
struct SandboxContainer {
pid: u32,
state: ContainerState,
state_dir: PathBuf,
rootfs_dir: PathBuf,
stdout_path: PathBuf,
stderr_path: PathBuf,
started_at: Option<Instant>,
spec: ServiceSpec,
sandbox_config: SandboxConfig,
memory_limit: Option<u64>,
watchdog_handle: Option<tokio::task::JoinHandle<()>>,
assigned_port: u16,
port_guard: Option<std::net::TcpListener>,
}
static STAGING_COUNTER: AtomicU64 = AtomicU64::new(0);
pub struct SandboxRuntime {
config: MacSandboxConfig,
containers: Arc<RwLock<HashMap<String, SandboxContainer>>>,
image_rootfs: Arc<RwLock<HashMap<String, PathBuf>>>,
auth_context: Option<crate::runtime::ContainerAuthContext>,
}
impl std::fmt::Debug for SandboxRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SandboxRuntime")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl SandboxRuntime {
pub fn new(
config: MacSandboxConfig,
auth_context: Option<crate::runtime::ContainerAuthContext>,
) -> Result<Self> {
std::fs::create_dir_all(&config.data_dir).map_err(|e| {
AgentError::Configuration(format!(
"Failed to create data dir {}: {e}",
config.data_dir.display(),
))
})?;
std::fs::create_dir_all(&config.log_dir).map_err(|e| {
AgentError::Configuration(format!(
"Failed to create log dir {}: {e}",
config.log_dir.display(),
))
})?;
std::fs::create_dir_all(config.data_dir.join("containers")).map_err(|e| {
AgentError::Configuration(format!("Failed to create containers dir: {e}"))
})?;
std::fs::create_dir_all(config.data_dir.join("images"))
.map_err(|e| AgentError::Configuration(format!("Failed to create images dir: {e}")))?;
tracing::info!(
data_dir = %config.data_dir.display(),
log_dir = %config.log_dir.display(),
gpu_access = config.gpu_access,
"macOS sandbox runtime initialized"
);
Ok(Self {
config,
containers: Arc::new(RwLock::new(HashMap::new())),
image_rootfs: Arc::new(RwLock::new(HashMap::new())),
auth_context,
})
}
#[must_use]
pub fn config(&self) -> &MacSandboxConfig {
&self.config
}
fn container_dir_name(id: &ContainerId) -> String {
format!("{}-{}", id.service, id.replica)
}
fn container_dir(&self, id: &ContainerId) -> PathBuf {
self.config
.data_dir
.join("containers")
.join(Self::container_dir_name(id))
}
fn images_dir(&self) -> PathBuf {
self.config.data_dir.join("images")
}
#[allow(clippy::too_many_lines)]
pub async fn register_local_rootfs(&self, image: &str, source_dir: &Path) -> Result<()> {
let safe_name = sanitize_image_name(image);
let image_dir = self.images_dir().join(&safe_name);
let rootfs_dir = image_dir.join("rootfs");
if rootfs_dir.exists() {
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir);
return Ok(());
}
tokio::fs::create_dir_all(&image_dir)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to create image dir: {e}"),
})?;
let staging_name = format!(
".rootfs-staging-{}-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos(),
STAGING_COUNTER.fetch_add(1, Ordering::Relaxed)
);
let staging_dir = image_dir.join(&staging_name);
clone_directory_recursive(source_dir, &staging_dir)
.await
.map_err(|e| {
let _ = std::fs::remove_dir_all(&staging_dir);
AgentError::PullFailed {
image: image.to_string(),
reason: format!(
"Failed to clone local rootfs from {}: {e}",
source_dir.display(),
),
}
})?;
if tokio::fs::rename(&staging_dir, &rootfs_dir).await.is_err() {
let _ = tokio::fs::remove_dir_all(&staging_dir).await;
if !rootfs_dir.exists() {
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: "Failed to finalize rootfs and no other clone succeeded".into(),
});
}
}
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir.clone());
tracing::info!(
image = %image,
source = %source_dir.display(),
rootfs = %rootfs_dir.display(),
"Registered local rootfs as image"
);
Ok(())
}
}
#[async_trait::async_trait]
impl Runtime for SandboxRuntime {
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(image, zlayer_spec::PullPolicy::IfNotPresent, None)
.await
}
#[allow(clippy::too_many_lines)]
async fn pull_image_with_policy(
&self,
image: &str,
policy: zlayer_spec::PullPolicy,
_auth: Option<&RegistryAuth>,
) -> Result<()> {
let safe_name = sanitize_image_name(image);
let image_dir = self.images_dir().join(&safe_name);
let rootfs_dir = image_dir.join("rootfs");
match policy {
zlayer_spec::PullPolicy::Always | zlayer_spec::PullPolicy::Newer => {
}
zlayer_spec::PullPolicy::IfNotPresent => {
if rootfs_dir.exists() {
tracing::debug!(image = %image, "Image already present, skipping pull");
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir);
return Ok(());
}
}
zlayer_spec::PullPolicy::Never => {
if !rootfs_dir.exists() {
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: "Image not present and pull policy is Never".to_string(),
});
}
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir);
return Ok(());
}
}
tracing::info!(
image = %image,
"Pulling image for macOS sandbox runtime \
(note: sandbox runtime expects macOS-native images)"
);
tokio::fs::create_dir_all(&rootfs_dir)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to create rootfs dir: {e}"),
})?;
let cache_path = self.images_dir().join("blobs.redb");
let cache_type = zlayer_registry::CacheType::persistent_at(&cache_path);
let blob_cache = cache_type
.build()
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to open blob cache: {e}"),
})?;
let puller = zlayer_registry::ImagePuller::with_cache(blob_cache);
let auth = zlayer_registry::RegistryAuth::Anonymous;
let layers = puller
.pull_image(image, &auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to pull image layers: {e}"),
})?;
tracing::info!(
image = %image,
layer_count = layers.len(),
"Extracting layers to image rootfs"
);
let mut unpacker = zlayer_registry::LayerUnpacker::new(rootfs_dir.clone());
unpacker
.unpack_layers(&layers)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to extract rootfs: {e}"),
})?;
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir.clone());
tracing::info!(
image = %image,
rootfs = %rootfs_dir.display(),
"Image pulled successfully"
);
Ok(())
}
#[allow(clippy::too_many_lines)]
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let container_dir = self.container_dir(id);
let rootfs_dir = container_dir.join("rootfs");
if container_dir.exists() {
tracing::warn!(
container = %dir_name,
"Stale container directory found, cleaning up"
);
if let Err(e) = tokio::fs::remove_dir_all(&container_dir).await {
tracing::warn!(
container = %dir_name,
error = %e,
"Failed to remove stale container directory"
);
}
}
tokio::fs::create_dir_all(&container_dir)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!(
"Failed to create container dir {}: {e}",
container_dir.display(),
),
})?;
tokio::fs::create_dir_all(container_dir.join("tmp"))
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to create container tmp dir: {e}"),
})?;
let image_name_str = spec.image.name.to_string();
let safe_image_name = sanitize_image_name(&image_name_str);
let image_rootfs = {
let images = self.image_rootfs.read().await;
images.get(&safe_image_name).cloned()
};
let image_rootfs =
image_rootfs.unwrap_or_else(|| self.images_dir().join(&safe_image_name).join("rootfs"));
if !image_rootfs.exists() {
return Err(AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!(
"Image rootfs not found at {}. Run pull_image first.",
image_rootfs.display()
),
});
}
tracing::debug!(
container = %dir_name,
src = %image_rootfs.display(),
dst = %rootfs_dir.display(),
"Cloning rootfs (APFS CoW)"
);
clone_directory_recursive(&image_rootfs, &rootfs_dir)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!(
"Failed to clone rootfs from {} to {}: {e}",
image_rootfs.display(),
rootfs_dir.display(),
),
})?;
let gpu_access = if self.config.gpu_access {
if let Some(ref gpu) = spec.resources.gpu {
if gpu.vendor == "apple" {
match gpu.mode.as_deref() {
Some("mps") => GpuAccess::MpsOnly,
_ => GpuAccess::MetalCompute,
}
} else {
GpuAccess::None
}
} else {
GpuAccess::None
}
} else {
GpuAccess::None
};
let (assigned_port, port_guard) = reserve_port().map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to reserve a dynamic port for sandbox container: {e}",),
})?;
tracing::info!(
container = %dir_name,
assigned_port = assigned_port,
"Reserved dynamic port for sandbox container"
);
let mut network_access = build_network_access(spec);
match &mut network_access {
NetworkAccess::LocalhostOnly {
ref mut bind_ports,
ref mut connect_ports,
} => {
if !bind_ports.contains(&assigned_port) {
bind_ports.push(assigned_port);
}
if !connect_ports.contains(&assigned_port) {
connect_ports.push(assigned_port);
}
}
NetworkAccess::None => {
network_access = NetworkAccess::LocalhostOnly {
bind_ports: vec![assigned_port],
connect_ports: vec![assigned_port, 53, 80, 443],
};
}
NetworkAccess::Full => {
}
}
let writable_dirs = build_writable_dirs(spec, &container_dir);
let memory_limit = spec
.resources
.memory
.as_ref()
.and_then(|m| parse_memory_string(m));
let mut sandbox_config = SandboxConfig {
rootfs_dir: rootfs_dir.clone(),
workspace_dir: container_dir.clone(),
gpu_access,
network_access,
writable_dirs,
readonly_dirs: vec![],
max_files: 4096,
cpu_time_limit: None,
memory_limit,
};
if let Some(ref auth_ctx) = self.auth_context {
sandbox_config
.writable_dirs
.push(PathBuf::from(&auth_ctx.socket_path));
}
let profile = generate_sandbox_profile(&sandbox_config);
let profile_path = container_dir.join("sandbox.sb");
tokio::fs::write(&profile_path, &profile)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to write Seatbelt profile: {e}"),
})?;
let config_json =
serde_json::to_string_pretty(spec).map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to serialize spec: {e}"),
})?;
tokio::fs::write(container_dir.join("config.json"), &config_json)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to write config.json: {e}"),
})?;
let stdout_path = container_dir.join("stdout.log");
let stderr_path = container_dir.join("stderr.log");
let container = SandboxContainer {
pid: 0,
state: ContainerState::Pending,
state_dir: container_dir,
rootfs_dir,
stdout_path,
stderr_path,
started_at: None,
spec: spec.clone(),
sandbox_config,
memory_limit,
watchdog_handle: None,
assigned_port,
port_guard: Some(port_guard),
};
let mut containers = self.containers.write().await;
containers.insert(dir_name.clone(), container);
tracing::info!(
container = %dir_name,
image = %spec.image.name,
port = assigned_port,
"Container created (sandbox)"
);
Ok(())
}
#[allow(clippy::too_many_lines)]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let (
rootfs_dir,
stdout_path,
stderr_path,
spec,
sandbox_config,
memory_limit,
assigned_port,
) = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not created".to_string(),
})?;
(
container.rootfs_dir.clone(),
container.stdout_path.clone(),
container.stderr_path.clone(),
container.spec.clone(),
container.sandbox_config.clone(),
container.memory_limit,
container.assigned_port,
)
};
let profile_path = self.container_dir(id).join("sandbox.sb");
let profile = tokio::fs::read_to_string(&profile_path)
.await
.map_err(|e| AgentError::StartFailed {
id: dir_name.clone(),
reason: format!("Failed to read Seatbelt profile: {e}"),
})?;
let (program, args) = resolve_entrypoint(&spec, &rootfs_dir)?;
tracing::info!(
container = %dir_name,
program = %program,
args = ?args,
port = assigned_port,
"Starting sandboxed process"
);
{
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(&dir_name) {
container.port_guard.take();
}
}
let auth_env = self.auth_context.as_ref().map(|ctx| {
let token = crate::auth::mint_container_token(
&ctx.jwt_secret,
&id.service,
&format!("{}-{}", id.service, id.replica),
std::time::Duration::from_secs(86400 * 365),
)
.unwrap_or_default();
(ctx.api_url.clone(), token, ctx.socket_path.clone())
});
let dir_name_for_err = dir_name.clone();
let child_pid = tokio::task::spawn_blocking(move || {
spawn_sandboxed_process(&SandboxSpawnParams {
program,
args,
sbpl_profile: profile,
rootfs_dir,
stdout_path,
stderr_path,
spec,
sandbox_config,
assigned_port,
auth_env,
})
})
.await
.map_err(|e| AgentError::StartFailed {
id: dir_name_for_err,
reason: format!("spawn task join error: {e}"),
})??;
let pid_path = self.container_dir(id).join("pid");
tokio::fs::write(&pid_path, child_pid.to_string())
.await
.map_err(|e| AgentError::StartFailed {
id: dir_name.clone(),
reason: format!("Failed to write PID file: {e}"),
})?;
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(&dir_name) {
container.pid = child_pid;
container.state = ContainerState::Running;
container.started_at = Some(Instant::now());
if let Some(limit) = memory_limit {
let pid = child_pid;
let handle = tokio::spawn(async move {
memory_watchdog(pid, limit).await;
});
container.watchdog_handle = Some(handle);
}
}
tracing::info!(
container = %dir_name,
pid = child_pid,
"Sandboxed process started"
);
Ok(())
}
#[allow(unsafe_code)]
#[allow(clippy::too_many_lines)]
#[allow(clippy::cast_possible_wrap)]
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let pid = {
let mut containers = self.containers.write().await;
let container = containers
.get_mut(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if container.pid == 0 {
container.state = ContainerState::Exited { code: 0 };
return Ok(());
}
container.state = ContainerState::Stopping;
container.pid
};
tracing::info!(
container = %dir_name,
pid = pid,
timeout = ?timeout,
"Stopping sandboxed process"
);
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
let deadline = Instant::now() + timeout;
loop {
if Instant::now() >= deadline {
break;
}
let mut status: libc::c_int = 0;
let result = unsafe { libc::waitpid(pid as i32, &raw mut status, libc::WNOHANG) };
if result > 0 {
let exit_code = if libc::WIFEXITED(status) {
libc::WEXITSTATUS(status)
} else {
-1
};
let mut containers = self.containers.write().await;
if let Some(c) = containers.get_mut(&dir_name) {
c.state = ContainerState::Exited { code: exit_code };
if let Some(h) = c.watchdog_handle.take() {
h.abort();
}
}
tracing::info!(
container = %dir_name,
exit_code = exit_code,
"Container stopped gracefully"
);
return Ok(());
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
tracing::warn!(
container = %dir_name,
pid = pid,
"SIGTERM timeout, sending SIGKILL"
);
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
let pid_for_wait = pid;
let exit_code = tokio::task::spawn_blocking(move || {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
loop {
let mut status: libc::c_int = 0;
let result =
unsafe { libc::waitpid(pid_for_wait as i32, &raw mut status, libc::WNOHANG) };
if result > 0 || result == -1 {
break; }
if std::time::Instant::now() >= deadline {
break; }
std::thread::sleep(std::time::Duration::from_millis(50));
}
-9i32
})
.await
.unwrap_or(-9);
let mut containers = self.containers.write().await;
if let Some(c) = containers.get_mut(&dir_name) {
c.state = ContainerState::Exited { code: exit_code };
if let Some(h) = c.watchdog_handle.take() {
h.abort();
}
}
tracing::info!(container = %dir_name, "Container killed (SIGKILL)");
Ok(())
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_wrap)]
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
tracing::info!(container = %dir_name, "Removing container");
{
let mut containers = self.containers.write().await;
if let Some(mut c) = containers.remove(&dir_name) {
if let Some(h) = c.watchdog_handle.take() {
h.abort();
}
if c.pid > 0
&& matches!(c.state, ContainerState::Running | ContainerState::Stopping)
{
unsafe {
libc::kill(c.pid as i32, libc::SIGKILL);
}
let pid = c.pid;
let _ = tokio::task::spawn_blocking(move || {
let deadline =
std::time::Instant::now() + std::time::Duration::from_secs(3);
loop {
let mut status: libc::c_int = 0;
let result = unsafe {
libc::waitpid(pid as i32, &raw mut status, libc::WNOHANG)
};
if result > 0 || result == -1 {
break;
}
if std::time::Instant::now() >= deadline {
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
})
.await;
}
}
}
let container_dir = self.container_dir(id);
if container_dir.exists() {
tokio::fs::remove_dir_all(&container_dir)
.await
.map_err(|e| {
AgentError::Internal(format!(
"Failed to remove container dir {}: {e}",
container_dir.display(),
))
})?;
}
tracing::info!(container = %dir_name, "Container removed");
Ok(())
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_wrap)]
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let dir_name = Self::container_dir_name(id);
let mut containers = self.containers.write().await;
let container = containers
.get_mut(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
match &container.state {
ContainerState::Exited { .. } | ContainerState::Failed { .. } => {
return Ok(container.state.clone());
}
ContainerState::Pending => return Ok(ContainerState::Pending),
_ => {}
}
if container.pid > 0 {
let mut status: libc::c_int = 0;
let result =
unsafe { libc::waitpid(container.pid as i32, &raw mut status, libc::WNOHANG) };
match result.cmp(&0) {
std::cmp::Ordering::Greater => {
let exit_code = if libc::WIFEXITED(status) {
libc::WEXITSTATUS(status)
} else if libc::WIFSIGNALED(status) {
-(libc::WTERMSIG(status))
} else {
-1
};
container.state = ContainerState::Exited { code: exit_code };
if let Some(h) = container.watchdog_handle.take() {
h.abort();
}
}
std::cmp::Ordering::Equal => {
container.state = ContainerState::Running;
}
std::cmp::Ordering::Less => {
container.state = ContainerState::Failed {
reason: "Process disappeared".to_string(),
};
}
}
}
Ok(container.state.clone())
}
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let dir_name = Self::container_dir_name(id);
let (stdout_path, stderr_path) = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
(container.stdout_path.clone(), container.stderr_path.clone())
};
let now = chrono::Utc::now();
let source = LogSource::Container(id.to_string());
let mut entries = Vec::new();
if let Ok(stdout) = tokio::fs::read_to_string(&stdout_path).await {
for line in stdout.lines() {
entries.push(LogEntry {
timestamp: now,
stream: LogStream::Stdout,
message: line.to_string(),
source: source.clone(),
service: None,
deployment: None,
});
}
}
if let Ok(stderr) = tokio::fs::read_to_string(&stderr_path).await {
for line in stderr.lines() {
entries.push(LogEntry {
timestamp: now,
stream: LogStream::Stderr,
message: line.to_string(),
source: source.clone(),
service: None,
deployment: None,
});
}
}
if tail > 0 && entries.len() > tail {
let start = entries.len() - tail;
entries = entries.split_off(start);
}
Ok(entries)
}
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
let dir_name = Self::container_dir_name(id);
if cmd.is_empty() {
return Err(AgentError::InvalidSpec(
"exec command cannot be empty".to_string(),
));
}
let (rootfs, profile_path) = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
(
container.rootfs_dir.clone(),
container.state_dir.join("sandbox.sb"),
)
};
let profile = tokio::fs::read_to_string(&profile_path)
.await
.map_err(|e| AgentError::Internal(format!("Failed to read Seatbelt profile: {e}")))?;
tracing::debug!(
container = %dir_name,
cmd = ?cmd,
"Executing command in sandbox"
);
let profile_clone = profile.clone();
let rootfs_clone = rootfs.clone();
let cmd_clone = cmd.to_vec();
let output = tokio::task::spawn_blocking(move || {
std::process::Command::new("/usr/bin/sandbox-exec")
.arg("-p")
.arg(&profile_clone)
.arg("--")
.arg(&cmd_clone[0])
.args(&cmd_clone[1..])
.current_dir(&rootfs_clone)
.output()
})
.await
.map_err(|e| AgentError::Internal(format!("exec task join error: {e}")))?
.map_err(|e| AgentError::Internal(format!("Failed to exec: {e}")))?;
let exit_code = output.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
tracing::debug!(
container = %dir_name,
exit_code = exit_code,
stdout_len = stdout.len(),
stderr_len = stderr.len(),
"exec completed"
);
Ok((exit_code, stdout, stderr))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let dir_name = Self::container_dir_name(id);
let (pid, memory_limit) = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if container.pid == 0 {
return Err(AgentError::Internal(
"Container not started -- no PID available for stats".to_string(),
));
}
(container.pid, container.memory_limit.unwrap_or(0))
};
let pid_for_stats = pid;
let (cpu_usec, memory_bytes) =
tokio::task::spawn_blocking(move || get_process_stats(pid_for_stats))
.await
.map_err(|e| AgentError::Internal(format!("stats task join error: {e}")))??;
Ok(ContainerStats {
cpu_usage_usec: cpu_usec,
memory_bytes,
memory_limit,
timestamp: Instant::now(),
})
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_wrap)]
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let dir_name = Self::container_dir_name(id);
let pid = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if let ContainerState::Exited { code } = &container.state {
return Ok(*code);
}
if container.pid == 0 {
return Err(AgentError::Internal(
"Container not started -- no PID to wait on".to_string(),
));
}
container.pid
};
tracing::debug!(container = %dir_name, pid = pid, "Waiting for container to exit");
let exit_code = tokio::task::spawn_blocking(move || {
let mut status: libc::c_int = 0;
let result = unsafe { libc::waitpid(pid as i32, &raw mut status, 0) };
if result < 0 {
return -1;
}
if libc::WIFEXITED(status) {
libc::WEXITSTATUS(status)
} else if libc::WIFSIGNALED(status) {
-(libc::WTERMSIG(status))
} else {
-1
}
})
.await
.map_err(|e| AgentError::Internal(format!("wait task join error: {e}")))?;
let mut containers = self.containers.write().await;
if let Some(c) = containers.get_mut(&dir_name) {
c.state = ContainerState::Exited { code: exit_code };
if let Some(h) = c.watchdog_handle.take() {
h.abort();
}
}
tracing::info!(
container = %dir_name,
exit_code = exit_code,
"Container exited"
);
Ok(exit_code)
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
let dir_name = Self::container_dir_name(id);
let (stdout_path, stderr_path) = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
(container.stdout_path.clone(), container.stderr_path.clone())
};
let now = chrono::Utc::now();
let source = LogSource::Container(id.to_string());
let mut entries = Vec::new();
if let Ok(content) = tokio::fs::read_to_string(&stdout_path).await {
for line in content.lines() {
entries.push(LogEntry {
timestamp: now,
stream: LogStream::Stdout,
message: line.to_string(),
source: source.clone(),
service: None,
deployment: None,
});
}
}
if let Ok(content) = tokio::fs::read_to_string(&stderr_path).await {
for line in content.lines() {
entries.push(LogEntry {
timestamp: now,
stream: LogStream::Stderr,
message: line.to_string(),
source: source.clone(),
service: None,
deployment: None,
});
}
}
Ok(entries)
}
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if container.pid > 0 {
Ok(Some(container.pid))
} else {
Ok(None)
}
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
if !containers.contains_key(&dir_name) {
return Err(AgentError::NotFound {
container: dir_name,
reason: "Container not found".to_string(),
});
}
Ok(Some(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)))
}
async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name,
reason: "Container not found".to_string(),
})?;
Ok(Some(container.assigned_port))
}
#[allow(unsafe_code)]
#[allow(clippy::cast_possible_wrap)]
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
let dir_name = Self::container_dir_name(id);
let pid = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
container.pid
};
if pid == 0 {
return Err(AgentError::InvalidSpec(format!(
"container '{dir_name}' is not running (no pid)"
)));
}
let signum = match canonical.as_str() {
"SIGKILL" => libc::SIGKILL,
"SIGTERM" => libc::SIGTERM,
"SIGINT" => libc::SIGINT,
"SIGHUP" => libc::SIGHUP,
"SIGUSR1" => libc::SIGUSR1,
"SIGUSR2" => libc::SIGUSR2,
other => {
return Err(AgentError::InvalidSpec(format!(
"unsupported signal '{other}'"
)));
}
};
tracing::info!(container = %dir_name, pid = pid, signal = %canonical, "killing sandboxed process");
let ret = unsafe { libc::kill(pid as i32, signum) };
if ret != 0 {
let err = std::io::Error::last_os_error();
return Err(AgentError::Internal(format!(
"kill({pid}, {canonical}) failed: {err}"
)));
}
Ok(())
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Err(AgentError::Unsupported(
"tag_image is not supported by the macOS sandbox runtime".into(),
))
}
}