use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{ContainerId, ContainerState, Runtime};
use std::collections::HashMap;
use std::ffi::CString;
use std::net::IpAddr;
use std::os::raw::{c_char, c_int, c_uint};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{RegistryAuth, ServiceSpec};
type KrunSetLogLevel = unsafe extern "C" fn(level: c_uint) -> c_int;
type KrunCreateCtx = unsafe extern "C" fn() -> c_int;
type KrunFreeCtx = unsafe extern "C" fn(ctx_id: c_uint) -> c_int;
type KrunSetVmConfig =
unsafe extern "C" fn(ctx_id: c_uint, num_vcpus: c_uint, ram_mib: c_uint) -> c_int;
type KrunSetRoot = unsafe extern "C" fn(ctx_id: c_uint, root_path: *const c_char) -> c_int;
type KrunSetWorkdir = unsafe extern "C" fn(ctx_id: c_uint, workdir_path: *const c_char) -> c_int;
type KrunSetExec = unsafe extern "C" fn(
ctx_id: c_uint,
exec_path: *const c_char,
argv: *const *const c_char,
envp: *const *const c_char,
) -> c_int;
type KrunSetGpuOptions2 = unsafe extern "C" fn(ctx_id: c_uint, virgl_flags: c_uint) -> c_int;
type KrunSetTsi = unsafe extern "C" fn(ctx_id: c_uint) -> c_int;
type KrunStartEnter = unsafe extern "C" fn(ctx_id: c_uint) -> c_int;
struct LibKrun {
_lib: libloading::Library,
set_log_level: KrunSetLogLevel,
create_ctx: KrunCreateCtx,
free_ctx: KrunFreeCtx,
set_vm_config: KrunSetVmConfig,
set_root: KrunSetRoot,
set_workdir: KrunSetWorkdir,
set_exec: KrunSetExec,
set_gpu_options2: KrunSetGpuOptions2,
set_tsi: KrunSetTsi,
start_enter: KrunStartEnter,
}
#[allow(unsafe_code)]
unsafe impl Send for LibKrun {}
#[allow(unsafe_code)]
unsafe impl Sync for LibKrun {}
impl LibKrun {
#[allow(unsafe_code)]
fn load() -> Result<Self> {
let lib_paths = [
"libkrun.dylib",
"/usr/local/lib/libkrun.dylib",
"/opt/homebrew/lib/libkrun.dylib",
];
let lib = lib_paths
.iter()
.find_map(|path| unsafe { libloading::Library::new(*path).ok() })
.ok_or_else(|| {
AgentError::Configuration(
"libkrun not found. Install via: brew tap slp/krunkit && brew install krunkit\n\
Searched: libkrun.dylib, /usr/local/lib/libkrun.dylib, /opt/homebrew/lib/libkrun.dylib"
.to_string(),
)
})?;
unsafe {
let set_log_level: KrunSetLogLevel =
*lib.get(b"krun_set_log_level\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_log_level: {e}"))
})?;
let create_ctx: KrunCreateCtx = *lib.get(b"krun_create_ctx\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_create_ctx: {e}"))
})?;
let free_ctx: KrunFreeCtx = *lib.get(b"krun_free_ctx\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_free_ctx: {e}"))
})?;
let set_vm_config: KrunSetVmConfig =
*lib.get(b"krun_set_vm_config\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_vm_config: {e}"))
})?;
let set_root: KrunSetRoot = *lib.get(b"krun_set_root\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_root: {e}"))
})?;
let set_workdir: KrunSetWorkdir = *lib.get(b"krun_set_workdir\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_workdir: {e}"))
})?;
let set_exec: KrunSetExec = *lib.get(b"krun_set_exec\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_exec: {e}"))
})?;
let set_gpu_options2: KrunSetGpuOptions2 =
*lib.get(b"krun_set_gpu_options2\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_gpu_options2: {e}"))
})?;
let set_tsi: KrunSetTsi = *lib.get(b"krun_set_tsi\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_set_tsi: {e}"))
})?;
let start_enter: KrunStartEnter = *lib.get(b"krun_start_enter\0").map_err(|e| {
AgentError::Configuration(format!("libkrun missing krun_start_enter: {e}"))
})?;
Ok(Self {
_lib: lib,
set_log_level,
create_ctx,
free_ctx,
set_vm_config,
set_root,
set_workdir,
set_exec,
set_gpu_options2,
set_tsi,
start_enter,
})
}
}
}
struct VmContainer {
state: ContainerState,
ctx_id: Option<u32>,
state_dir: PathBuf,
rootfs_dir: PathBuf,
log_file: PathBuf,
started_at: Option<Instant>,
spec: ServiceSpec,
vm_thread: Option<std::thread::JoinHandle<i32>>,
gpu_enabled: bool,
ram_mib: u32,
vcpus: u32,
}
pub struct VmRuntime {
api: Option<Arc<LibKrun>>,
data_dir: PathBuf,
log_dir: PathBuf,
containers: Arc<RwLock<HashMap<String, VmContainer>>>,
image_rootfs: Arc<RwLock<HashMap<String, PathBuf>>>,
gpu_in_use: Arc<AtomicBool>,
}
impl std::fmt::Debug for VmRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VmRuntime")
.field("data_dir", &self.data_dir)
.field("log_dir", &self.log_dir)
.field("libkrun_loaded", &self.api.is_some())
.finish_non_exhaustive()
}
}
impl VmRuntime {
#[allow(unsafe_code)]
pub fn new(_auth_ctx: Option<crate::runtime::ContainerAuthContext>) -> Result<Self> {
let data_dir = zlayer_paths::ZLayerDirs::default_data_dir();
let log_dir = zlayer_paths::ZLayerDirs::default_log_dir();
std::fs::create_dir_all(data_dir.join("vms")).map_err(|e| {
AgentError::Configuration(format!(
"Failed to create VM state dir {}: {}",
data_dir.join("vms").display(),
e
))
})?;
std::fs::create_dir_all(data_dir.join("images")).map_err(|e| {
AgentError::Configuration(format!(
"Failed to create images dir {}: {}",
data_dir.join("images").display(),
e
))
})?;
std::fs::create_dir_all(&log_dir).map_err(|e| {
AgentError::Configuration(format!(
"Failed to create log dir {}: {}",
log_dir.display(),
e
))
})?;
let api = match LibKrun::load() {
Ok(lib) => {
unsafe { (lib.set_log_level)(2) };
tracing::info!("libkrun loaded successfully");
Some(Arc::new(lib))
}
Err(e) => {
tracing::warn!(
"libkrun not available: {e}. VM runtime will be non-functional. \
Install via: brew tap slp/krunkit && brew install krunkit"
);
None
}
};
tracing::info!(
data_dir = %data_dir.display(),
log_dir = %log_dir.display(),
libkrun_available = api.is_some(),
"macOS VM runtime initialized"
);
Ok(Self {
api,
data_dir,
log_dir,
containers: Arc::new(RwLock::new(HashMap::new())),
image_rootfs: Arc::new(RwLock::new(HashMap::new())),
gpu_in_use: Arc::new(AtomicBool::new(false)),
})
}
fn container_dir_name(id: &ContainerId) -> String {
format!("{}-{}", id.service, id.replica)
}
fn vm_dir(&self, id: &ContainerId) -> PathBuf {
self.data_dir.join("vms").join(Self::container_dir_name(id))
}
fn images_dir(&self) -> PathBuf {
self.data_dir.join("images")
}
fn require_api(&self) -> Result<&Arc<LibKrun>> {
self.api.as_ref().ok_or_else(|| {
AgentError::Configuration(
"libkrun is not available. Cannot start VM containers.\n\
Install via: brew tap slp/krunkit && brew install krunkit"
.to_string(),
)
})
}
}
fn sanitize_image_name(image: &str) -> String {
image.replace(['/', ':', '@'], "_")
}
#[allow(clippy::cast_possible_truncation)]
fn parse_memory_to_mib(s: &str) -> Option<u32> {
let s = s.trim();
if let Some(num) = s.strip_suffix("Gi") {
num.parse::<u32>().ok().map(|v| v * 1024)
} else if let Some(num) = s.strip_suffix("Mi") {
num.parse::<u32>().ok()
} else if let Some(num) = s.strip_suffix("Ki") {
num.parse::<u32>().ok().map(|v| v / 1024)
} else {
s.parse::<u64>().ok().map(|v| (v / (1024 * 1024)) as u32)
}
}
#[allow(clippy::cast_possible_truncation)]
fn safe_vcpu_count(requested: u32) -> u32 {
let host_cores = num_cpus::get() as u32;
let clamped = requested.min(host_cores).max(1);
if requested > host_cores {
tracing::warn!(
requested = requested,
host_cores = host_cores,
clamped = clamped,
"Clamping vCPU count to host core count (libkrun hangs if exceeded)"
);
}
clamped
}
fn resolve_entrypoint(spec: &ServiceSpec) -> (String, Vec<String>) {
if let Some(ref entrypoint) = spec.command.entrypoint {
if !entrypoint.is_empty() {
let program = entrypoint[0].clone();
let mut args: Vec<String> = entrypoint[1..].to_vec();
if let Some(ref extra_args) = spec.command.args {
args.extend(extra_args.iter().cloned());
}
return (program, args);
}
}
if let Some(ref cmd_args) = spec.command.args {
if !cmd_args.is_empty() {
let program = cmd_args[0].clone();
let args = cmd_args[1..].to_vec();
return (program, args);
}
}
("/bin/sh".to_string(), vec![])
}
fn to_cstring(s: &str, context: &str) -> Result<CString> {
CString::new(s)
.map_err(|e| AgentError::InvalidSpec(format!("{context} contains null byte: {e}")))
}
fn build_c_string_array(strings: &[String]) -> Result<(Vec<CString>, Vec<*const c_char>)> {
let c_strings: Vec<CString> = strings
.iter()
.map(|s| {
CString::new(s.as_str())
.map_err(|e| AgentError::InvalidSpec(format!("String contains null byte: {e}")))
})
.collect::<Result<Vec<_>>>()?;
let mut ptrs: Vec<*const c_char> = c_strings.iter().map(|cs| cs.as_ptr()).collect();
ptrs.push(std::ptr::null());
Ok((c_strings, ptrs))
}
fn build_env_array(env: &HashMap<String, String>) -> Result<(Vec<CString>, Vec<*const c_char>)> {
let env_strings: Vec<String> = env.iter().map(|(k, v)| format!("{k}={v}")).collect();
build_c_string_array(&env_strings)
}
#[async_trait::async_trait]
impl Runtime for VmRuntime {
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(image, zlayer_spec::PullPolicy::IfNotPresent, None)
.await
}
async fn pull_image_with_policy(
&self,
image: &str,
policy: zlayer_spec::PullPolicy,
_auth: Option<&RegistryAuth>,
) -> Result<()> {
let safe_name = sanitize_image_name(image);
let image_dir = self.images_dir().join(&safe_name);
let rootfs_dir = image_dir.join("rootfs");
match policy {
zlayer_spec::PullPolicy::Always | zlayer_spec::PullPolicy::Newer => {
}
zlayer_spec::PullPolicy::IfNotPresent => {
if rootfs_dir.exists() {
tracing::debug!(image = %image, "Image already present, skipping pull");
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir);
return Ok(());
}
}
zlayer_spec::PullPolicy::Never => {
if !rootfs_dir.exists() {
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: "Image not present and pull policy is Never".to_string(),
});
}
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir);
return Ok(());
}
}
tracing::info!(
image = %image,
"Pulling image for VM runtime (Linux OCI images supported natively)"
);
tokio::fs::create_dir_all(&rootfs_dir)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to create rootfs dir: {e}"),
})?;
let cache_path = self.images_dir().join("blobs.redb");
let cache_type = zlayer_registry::CacheType::persistent_at(&cache_path);
let blob_cache = cache_type
.build()
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to open blob cache: {e}"),
})?;
let puller = zlayer_registry::ImagePuller::with_cache(blob_cache);
let auth = zlayer_registry::RegistryAuth::Anonymous;
let layers = puller
.pull_image(image, &auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to pull image layers: {e}"),
})?;
tracing::info!(
image = %image,
layer_count = layers.len(),
"Extracting layers to image rootfs"
);
let mut unpacker = zlayer_registry::LayerUnpacker::new(rootfs_dir.clone());
unpacker
.unpack_layers(&layers)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("Failed to extract rootfs: {e}"),
})?;
let mut images = self.image_rootfs.write().await;
images.insert(safe_name, rootfs_dir.clone());
tracing::info!(
image = %image,
rootfs = %rootfs_dir.display(),
"Image pulled successfully"
);
Ok(())
}
#[allow(clippy::too_many_lines)]
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let vm_dir = self.vm_dir(id);
let rootfs_dir = vm_dir.join("rootfs");
if vm_dir.exists() {
tracing::warn!(
container = %dir_name,
"Stale VM directory found, cleaning up"
);
if let Err(e) = tokio::fs::remove_dir_all(&vm_dir).await {
tracing::warn!(
container = %dir_name,
error = %e,
"Failed to remove stale VM directory"
);
}
}
tokio::fs::create_dir_all(&vm_dir)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to create VM dir {}: {e}", vm_dir.display()),
})?;
let image_name_str = spec.image.name.to_string();
let safe_image_name = sanitize_image_name(&image_name_str);
let image_rootfs = {
let images = self.image_rootfs.read().await;
images.get(&safe_image_name).cloned()
};
let image_rootfs =
image_rootfs.unwrap_or_else(|| self.images_dir().join(&safe_image_name).join("rootfs"));
if !image_rootfs.exists() {
return Err(AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!(
"Image rootfs not found at {}. Run pull_image first.",
image_rootfs.display()
),
});
}
tracing::debug!(
container = %dir_name,
src = %image_rootfs.display(),
dst = %rootfs_dir.display(),
"Copying rootfs for VM"
);
copy_directory_recursive(&image_rootfs, &rootfs_dir)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!(
"Failed to copy rootfs from {} to {}: {e}",
image_rootfs.display(),
rootfs_dir.display()
),
})?;
let config_json =
serde_json::to_string_pretty(spec).map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to serialize spec: {e}"),
})?;
tokio::fs::write(vm_dir.join("config.json"), &config_json)
.await
.map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("Failed to write config.json: {e}"),
})?;
let log_file = vm_dir.join("console.log");
let gpu_enabled = spec
.resources
.gpu
.as_ref()
.is_some_and(|gpu| gpu.vendor == "apple");
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let vcpus = spec
.resources
.cpu
.map_or(2, |cpu| (cpu.ceil() as u32).max(1));
let vcpus = safe_vcpu_count(vcpus);
let ram_mib = spec
.resources
.memory
.as_ref()
.and_then(|m| parse_memory_to_mib(m))
.unwrap_or(512)
.max(128);
let container = VmContainer {
state: ContainerState::Pending,
ctx_id: None,
state_dir: vm_dir,
rootfs_dir,
log_file,
started_at: None,
spec: spec.clone(),
vm_thread: None,
gpu_enabled,
ram_mib,
vcpus,
};
let mut containers = self.containers.write().await;
containers.insert(dir_name.clone(), container);
tracing::info!(
container = %dir_name,
image = %spec.image.name,
vcpus = vcpus,
ram_mib = ram_mib,
gpu = gpu_enabled,
"VM container created"
);
Ok(())
}
#[allow(clippy::too_many_lines, unsafe_code)]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let api = self.require_api()?;
let (rootfs_dir, log_file, spec, gpu_enabled, vcpus, ram_mib) = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not created".to_string(),
})?;
(
container.rootfs_dir.clone(),
container.log_file.clone(),
container.spec.clone(),
container.gpu_enabled,
container.vcpus,
container.ram_mib,
)
};
if gpu_enabled {
let was_in_use = self.gpu_in_use.swap(true, Ordering::SeqCst);
if was_in_use {
return Err(AgentError::StartFailed {
id: dir_name,
reason: "Cannot start GPU-enabled VM: another GPU VM is already running. \
libkrun supports only one GPU-enabled VM at a time due to Metal \
GPU context limitations. Stop the existing GPU container first, \
or use the SandboxRuntime (Approach A) which supports multiple \
GPU processes via Metal's built-in multi-process support."
.to_string(),
});
}
}
let (program, args) = resolve_entrypoint(&spec);
tracing::info!(
container = %dir_name,
program = %program,
args = ?args,
vcpus = vcpus,
ram_mib = ram_mib,
gpu = gpu_enabled,
"Starting libkrun VM"
);
let ctx_id = unsafe { (api.create_ctx)() };
if ctx_id < 0 {
if gpu_enabled {
self.gpu_in_use.store(false, Ordering::SeqCst);
}
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!("krun_create_ctx failed with code {ctx_id}"),
});
}
#[allow(clippy::cast_sign_loss)]
let ctx_id = ctx_id as u32;
let cleanup_on_error = |api: &LibKrun, ctx: u32, gpu: bool, gpu_flag: &AtomicBool| {
unsafe { (api.free_ctx)(ctx) };
if gpu {
gpu_flag.store(false, Ordering::SeqCst);
}
};
let ret = unsafe { (api.set_vm_config)(ctx_id, vcpus, ram_mib) };
if ret != 0 {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!(
"krun_set_vm_config({vcpus} vCPUs, {ram_mib} MiB) failed with code {ret}"
),
});
}
let rootfs_cstr = to_cstring(rootfs_dir.to_str().unwrap_or("/invalid"), "rootfs path")?;
let ret = unsafe { (api.set_root)(ctx_id, rootfs_cstr.as_ptr()) };
if ret != 0 {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!(
"krun_set_root({}) failed with code {ret}",
rootfs_dir.display()
),
});
}
if let Some(ref workdir) = spec.command.workdir {
let workdir_cstr = to_cstring(workdir, "workdir path")?;
let ret = unsafe { (api.set_workdir)(ctx_id, workdir_cstr.as_ptr()) };
if ret != 0 {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!("krun_set_workdir({workdir}) failed with code {ret}"),
});
}
}
if gpu_enabled {
let ret = unsafe { (api.set_gpu_options2)(ctx_id, 0) };
if ret != 0 {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!("krun_set_gpu_options2 failed with code {ret}"),
});
}
tracing::info!(
container = %dir_name,
"GPU forwarding enabled (Venus-Vulkan/ggml)"
);
}
let ret = unsafe { (api.set_tsi)(ctx_id) };
if ret != 0 {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!("krun_set_tsi failed with code {ret}"),
});
}
let exec_cstr = to_cstring(&program, "exec path")?;
let mut argv_strings = vec![program.clone()];
argv_strings.extend(args.clone());
let ret = {
let (_argv_cstrings, argv_ptrs) = build_c_string_array(&argv_strings)?;
let (_envp_cstrings, envp_ptrs) = build_env_array(&spec.env)?;
unsafe {
(api.set_exec)(
ctx_id,
exec_cstr.as_ptr(),
argv_ptrs.as_ptr(),
envp_ptrs.as_ptr(),
)
}
};
if ret != 0 {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
return Err(AgentError::StartFailed {
id: dir_name,
reason: format!("krun_set_exec({program}) failed with code {ret}"),
});
}
let api_clone = Arc::clone(api);
let thread_dir_name = dir_name.clone();
let thread_log_file = log_file.clone();
let vm_thread = std::thread::Builder::new()
.name(format!("krun-vm-{dir_name}"))
.spawn(move || {
tracing::debug!(
container = %thread_dir_name,
ctx_id = ctx_id,
"VM thread starting krun_start_enter"
);
let _ = std::fs::File::create(&thread_log_file);
let exit_code = unsafe { (api_clone.start_enter)(ctx_id) };
tracing::info!(
container = %thread_dir_name,
ctx_id = ctx_id,
exit_code = exit_code,
"VM exited"
);
exit_code
})
.map_err(|e| {
cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
AgentError::StartFailed {
id: dir_name.clone(),
reason: format!("Failed to spawn VM thread: {e}"),
}
})?;
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(&dir_name) {
container.ctx_id = Some(ctx_id);
container.state = ContainerState::Running;
container.started_at = Some(Instant::now());
container.vm_thread = Some(vm_thread);
}
tracing::info!(
container = %dir_name,
ctx_id = ctx_id,
"VM started"
);
Ok(())
}
#[allow(unsafe_code)]
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let (ctx_id, gpu_enabled, vm_thread) = {
let mut containers = self.containers.write().await;
let container = containers
.get_mut(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if matches!(
container.state,
ContainerState::Exited { .. } | ContainerState::Failed { .. }
) {
return Ok(());
}
container.state = ContainerState::Stopping;
(
container.ctx_id,
container.gpu_enabled,
container.vm_thread.take(),
)
};
tracing::info!(
container = %dir_name,
ctx_id = ?ctx_id,
timeout = ?timeout,
"Stopping VM"
);
if let (Some(api), Some(ctx)) = (&self.api, ctx_id) {
let ret = unsafe { (api.free_ctx)(ctx) };
if ret != 0 {
tracing::warn!(
container = %dir_name,
ctx_id = ctx,
ret = ret,
"krun_free_ctx returned non-zero (VM may have already exited)"
);
}
}
if let Some(thread) = vm_thread {
let exit_code = tokio::task::spawn_blocking(move || {
thread.join().unwrap_or_else(|_| {
tracing::warn!("VM thread panicked during join");
-1
})
})
.await
.unwrap_or(-1);
let mut containers = self.containers.write().await;
if let Some(c) = containers.get_mut(&dir_name) {
c.state = ContainerState::Exited { code: exit_code };
c.ctx_id = None;
}
tracing::info!(
container = %dir_name,
exit_code = exit_code,
"VM stopped"
);
} else {
let mut containers = self.containers.write().await;
if let Some(c) = containers.get_mut(&dir_name) {
c.state = ContainerState::Exited { code: 0 };
c.ctx_id = None;
}
}
if gpu_enabled {
self.gpu_in_use.store(false, Ordering::SeqCst);
}
Ok(())
}
#[allow(unsafe_code)]
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
tracing::info!(container = %dir_name, "Removing VM container");
{
let containers = self.containers.read().await;
if let Some(c) = containers.get(&dir_name) {
if matches!(
c.state,
ContainerState::Running
| ContainerState::Stopping
| ContainerState::Initializing
) {
drop(containers);
let _ = self.stop_container(id, Duration::from_secs(5)).await;
}
}
}
let removed_state_dir = {
let mut containers = self.containers.write().await;
if let Some(c) = containers.remove(&dir_name) {
if c.gpu_enabled
&& matches!(c.state, ContainerState::Running | ContainerState::Stopping)
{
self.gpu_in_use.store(false, Ordering::SeqCst);
}
if let (Some(api), Some(ctx)) = (&self.api, c.ctx_id) {
let _ = unsafe { (api.free_ctx)(ctx) };
}
Some(c.state_dir)
} else {
None
}
};
let vm_dir = removed_state_dir.unwrap_or_else(|| self.vm_dir(id));
if vm_dir.exists() {
tokio::fs::remove_dir_all(&vm_dir).await.map_err(|e| {
AgentError::Internal(format!("Failed to remove VM dir {}: {e}", vm_dir.display()))
})?;
}
tracing::info!(container = %dir_name, "VM container removed");
Ok(())
}
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let dir_name = Self::container_dir_name(id);
let mut containers = self.containers.write().await;
let container = containers
.get_mut(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
match &container.state {
ContainerState::Exited { .. } | ContainerState::Failed { .. } => {
return Ok(container.state.clone());
}
ContainerState::Pending => return Ok(ContainerState::Pending),
_ => {}
}
if let Some(ref thread) = container.vm_thread {
if thread.is_finished() {
let Some(thread) = container.vm_thread.take() else {
return Ok(container.state.clone());
};
let Ok(exit_code) = thread.join() else {
container.state = ContainerState::Failed {
reason: "VM thread panicked".to_string(),
};
return Ok(container.state.clone());
};
container.state = ContainerState::Exited { code: exit_code };
container.ctx_id = None;
if container.gpu_enabled {
self.gpu_in_use.store(false, Ordering::SeqCst);
}
}
}
Ok(container.state.clone())
}
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let dir_name = Self::container_dir_name(id);
let log_file = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
container.log_file.clone()
};
let content = match tokio::fs::read_to_string(&log_file).await {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(Vec::new());
}
Err(e) => {
return Err(AgentError::Internal(format!(
"Failed to read VM console log {}: {e}",
log_file.display()
)));
}
};
let now = chrono::Utc::now();
let source = LogSource::Container(id.to_string());
let mut entries: Vec<LogEntry> = content
.lines()
.map(|line| LogEntry {
timestamp: now,
stream: LogStream::Stdout,
message: line.to_string(),
source: source.clone(),
service: None,
deployment: None,
})
.collect();
if tail > 0 && entries.len() > tail {
let start = entries.len() - tail;
entries = entries.split_off(start);
}
Ok(entries)
}
async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
if !containers.contains_key(&dir_name) {
return Err(AgentError::NotFound {
container: dir_name,
reason: "Container not found".to_string(),
});
}
Err(AgentError::Internal(
"exec is not supported in the VM runtime. libkrun does not provide an API \
to execute commands inside a running VM. This would require a vsock-based \
agent or SSH server inside the guest. Use the SandboxRuntime (MacSandbox) \
if you need exec capability."
.to_string(),
))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if !matches!(container.state, ContainerState::Running) {
return Err(AgentError::Internal(
"Container is not running -- cannot collect stats".to_string(),
));
}
#[allow(clippy::cast_possible_truncation)]
let uptime_usec = container
.started_at
.map_or(0, |t| t.elapsed().as_micros() as u64);
let memory_limit = u64::from(container.ram_mib) * 1024 * 1024;
Ok(ContainerStats {
cpu_usage_usec: uptime_usec,
memory_bytes: memory_limit,
memory_limit,
timestamp: Instant::now(),
})
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let dir_name = Self::container_dir_name(id);
{
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
if let ContainerState::Exited { code } = &container.state {
return Ok(*code);
}
}
tracing::debug!(container = %dir_name, "Waiting for VM to exit");
loop {
{
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(&dir_name) {
if let ContainerState::Exited { code } = &container.state {
return Ok(*code);
}
if let ContainerState::Failed { reason } = &container.state {
return Err(AgentError::Internal(format!("VM failed: {reason}")));
}
if let Some(ref thread) = container.vm_thread {
if thread.is_finished() {
if let Some(thread) = container.vm_thread.take() {
let exit_code = thread.join().unwrap_or(-1);
container.state = ContainerState::Exited { code: exit_code };
container.ctx_id = None;
if container.gpu_enabled {
self.gpu_in_use.store(false, Ordering::SeqCst);
}
tracing::info!(
container = %dir_name,
exit_code = exit_code,
"VM exited (via wait)"
);
return Ok(exit_code);
}
}
} else {
return Err(AgentError::Internal(
"VM has no active thread to wait on".to_string(),
));
}
} else {
return Err(AgentError::NotFound {
container: dir_name,
reason: "Container disappeared while waiting".to_string(),
});
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
let dir_name = Self::container_dir_name(id);
let log_file = {
let containers = self.containers.read().await;
let container = containers
.get(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "Container not found".to_string(),
})?;
container.log_file.clone()
};
let content = match tokio::fs::read_to_string(&log_file).await {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => {
return Err(AgentError::Internal(format!(
"Failed to read VM console log: {e}"
)));
}
};
let now = chrono::Utc::now();
let source = LogSource::Container(id.to_string());
let entries = content
.lines()
.map(|line| LogEntry {
timestamp: now,
stream: LogStream::Stdout,
message: line.to_string(),
source: source.clone(),
service: None,
deployment: None,
})
.collect();
Ok(entries)
}
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
if !containers.contains_key(&dir_name) {
return Err(AgentError::NotFound {
container: dir_name,
reason: "Container not found".to_string(),
});
}
Ok(None)
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
let dir_name = Self::container_dir_name(id);
let containers = self.containers.read().await;
if !containers.contains_key(&dir_name) {
return Err(AgentError::NotFound {
container: dir_name,
reason: "Container not found".to_string(),
});
}
Ok(Some(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)))
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
match canonical.as_str() {
"SIGKILL" | "SIGTERM" => {
self.stop_container(id, Duration::from_secs(0)).await
}
other => Err(AgentError::Unsupported(format!(
"signal '{other}' is not supported by the macOS VM runtime (only SIGKILL and SIGTERM map to VM teardown)"
))),
}
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Err(AgentError::Unsupported(
"tag_image is not supported by the macOS VM runtime".into(),
))
}
}
async fn copy_directory_recursive(
src: &std::path::Path,
dst: &std::path::Path,
) -> std::io::Result<()> {
tokio::fs::create_dir_all(dst).await?;
let mut entries = tokio::fs::read_dir(src).await?;
while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path();
let file_name = entry.file_name();
let dest_path = dst.join(&file_name);
let file_type = entry.file_type().await?;
if file_type.is_dir() {
Box::pin(copy_directory_recursive(&entry_path, &dest_path)).await?;
} else if file_type.is_file() {
tokio::fs::copy(&entry_path, &dest_path).await?;
} else if file_type.is_symlink() {
let link_target = tokio::fs::read_link(&entry_path).await?;
tokio::fs::symlink(&link_target, &dest_path).await?;
}
}
let src_meta = tokio::fs::metadata(src).await?;
tokio::fs::set_permissions(dst, src_meta.permissions()).await?;
Ok(())
}