#![allow(unsafe_code)]
use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{
ContainerId, ContainerState, LogChannel, LogChunk, LogsStream, LogsStreamOptions,
OverlayAttachKind, Runtime, StatsSample, StatsStream,
};
use crate::runtimes::macos_vz_shared::{
self, clamp_cpu_count, clamp_memory_bytes, clone_or_copy, file_url, read_vm_state,
run_vm_lifecycle, spec_memory_mib, spec_vcpus, LiveVm, QueuePinned, VmLifecycleOp,
};
use std::collections::HashMap;
use std::io::Write as _;
use std::net::IpAddr;
use std::os::unix::io::FromRawFd;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{RegistryAuth, ServiceSpec};
use zlayer_vzagent::proto;
use block2::RcBlock;
use dispatch2::DispatchQueue;
use objc2::rc::Retained;
use objc2::{AnyThread, ClassType};
use objc2_foundation::{NSArray, NSError, NSFileHandle, NSString};
use objc2_virtualization::{
VZBootLoader, VZDirectoryShare, VZDirectorySharingDeviceConfiguration,
VZEntropyDeviceConfiguration, VZFileHandleSerialPortAttachment, VZGenericPlatformConfiguration,
VZLinuxBootLoader, VZMACAddress, VZNATNetworkDeviceAttachment, VZNetworkDeviceConfiguration,
VZPlatformConfiguration, VZSharedDirectory, VZSingleDirectoryShare,
VZSocketDeviceConfiguration, VZVirtioConsoleDeviceSerialPortConfiguration,
VZVirtioEntropyDeviceConfiguration, VZVirtioFileSystemDeviceConfiguration,
VZVirtioNetworkDeviceConfiguration, VZVirtioSocketConnection, VZVirtioSocketDevice,
VZVirtioSocketDeviceConfiguration, VZVirtualMachine, VZVirtualMachineConfiguration,
VZVirtualMachineState,
};
#[derive(Clone, Debug)]
pub(crate) struct LinuxKernel {
pub(crate) image: PathBuf,
pub(crate) initramfs: PathBuf,
}
#[derive(Default)]
pub(crate) struct RunOutcome {
logs: Mutex<Vec<LogEntry>>,
pid: Mutex<Option<i32>>,
exit_code: Mutex<Option<i32>>,
terminal: Mutex<Option<ContainerState>>,
}
struct LogFollowState {
containers: Arc<RwLock<HashMap<String, VzLinuxContainer>>>,
dir_name: String,
next: usize,
done: bool,
}
pub(crate) struct VzLinuxContainer {
state: ContainerState,
state_dir: PathBuf,
#[allow(dead_code)]
rootfs_dir: PathBuf,
image_rootfs_dir: PathBuf,
kernel: Option<LinuxKernel>,
console_log: PathBuf,
mac: String,
spec: ServiceSpec,
image_config: Option<zlayer_registry::ImageConfig>,
vcpus: u32,
#[allow(dead_code)]
ram_mib: u32,
#[allow(dead_code)]
started_at: Option<Instant>,
live: Option<macos_vz_shared::LiveVm>,
outcome: Arc<RunOutcome>,
drain_task: Option<JoinHandle<()>>,
cleanup_task: Option<JoinHandle<()>>,
settime_task: Option<JoinHandle<()>>,
port_forwards: Vec<PortForward>,
stdin_tx: Option<std::sync::mpsc::Sender<Vec<u8>>>,
overlay_ip: Option<IpAddr>,
}
struct PortForward {
container_port: u16,
protocol: zlayer_spec::PortProtocol,
task: JoinHandle<()>,
}
pub struct VzLinuxRuntime {
data_dir: PathBuf,
#[allow(dead_code)]
log_dir: PathBuf,
containers: Arc<RwLock<HashMap<String, VzLinuxContainer>>>,
image_rootfs: Arc<RwLock<HashMap<String, PathBuf>>>,
}
impl std::fmt::Debug for VzLinuxRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VzLinuxRuntime")
.field("data_dir", &self.data_dir)
.field("log_dir", &self.log_dir)
.finish_non_exhaustive()
}
}
impl VzLinuxRuntime {
pub fn new(_auth: Option<crate::runtime::ContainerAuthContext>) -> Result<Self> {
let data_dir = zlayer_paths::ZLayerDirs::default_data_dir();
let log_dir = zlayer_paths::ZLayerDirs::default_log_dir();
let linux_dir = data_dir.join("vz").join("linux");
for d in [
&linux_dir,
&linux_dir.join("images"),
&linux_dir.join("cache"),
&log_dir,
] {
std::fs::create_dir_all(d).map_err(|e| {
AgentError::Configuration(format!("Failed to create {}: {e}", d.display()))
})?;
}
tracing::info!(
vz_linux_dir = %linux_dir.display(),
"macOS VZ Linux-guest runtime ready (default Linux path on macOS; \
VM boot lands in a later phase)"
);
Ok(Self {
data_dir,
log_dir,
containers: Arc::new(RwLock::new(HashMap::new())),
image_rootfs: Arc::new(RwLock::new(HashMap::new())),
})
}
fn resolve_pull_auth(
auth: Option<&RegistryAuth>,
image: &str,
) -> zlayer_registry::RegistryAuth {
match auth {
Some(a) => zlayer_registry::spec_auth_to_oci(Some(a)),
None => {
zlayer_core::AuthResolver::new(zlayer_core::AuthConfig::default()).resolve(image)
}
}
}
fn container_dir_name(id: &ContainerId) -> String {
format!("{}-{}", id.service, id.replica)
}
fn pseudo_pid(id: &ContainerId) -> u32 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
id.service.hash(&mut h);
id.replica.hash(&mut h);
u32::try_from(h.finish() & 0x3fff_ffff).unwrap_or(0) | 0x4000_0000
}
fn linux_dir(&self) -> PathBuf {
self.data_dir.join("vz").join("linux")
}
fn images_dir(&self) -> PathBuf {
self.linux_dir().join("images")
}
fn resolve_existing_rootfs(&self, image: &str) -> Option<PathBuf> {
let images_dir = self.images_dir();
let literal = images_dir.join(sanitize_image_name(image)).join("rootfs");
if rootfs_is_populated(&literal) {
return Some(literal);
}
for (name, reference) in zlayer_types::image_ref_candidates(image) {
let spelling = format!("{name}:{reference}");
let rootfs = images_dir
.join(sanitize_image_name(&spelling))
.join("rootfs");
if rootfs_is_populated(&rootfs) {
return Some(rootfs);
}
}
None
}
#[allow(dead_code)]
fn cache_dir(&self) -> PathBuf {
self.linux_dir().join("cache")
}
async fn open_local_registry(&self) -> Option<std::sync::Arc<zlayer_registry::LocalRegistry>> {
let registry_path = self.data_dir.join("registry");
match zlayer_registry::LocalRegistry::new(registry_path).await {
Ok(reg) => Some(std::sync::Arc::new(reg)),
Err(e) => {
tracing::warn!(error = %e, "vz-linux: failed to open local registry; \
imported/built images will be invisible");
None
}
}
}
async fn ensure_image_config_sidecar(
&self,
image: &str,
rootfs: &std::path::Path,
auth: Option<&RegistryAuth>,
source: zlayer_spec::SourcePolicy,
) {
let Some(image_dir) = rootfs.parent() else {
return;
};
if image_dir.join("image-config.json").exists() {
return;
}
let cache_path = self.images_dir().join("blobs.redb");
let Ok(blob_cache) = zlayer_registry::CacheType::persistent_at(&cache_path)
.build()
.await
else {
tracing::debug!(image = %image, "vz-linux: sidecar backfill skipped (blob cache open failed)");
return;
};
let mut puller =
zlayer_registry::ImagePuller::from_env_for_runtime(blob_cache, source).await;
if let Some(reg) = self.open_local_registry().await {
puller = puller.with_local_registry(reg);
}
let pull_auth = Self::resolve_pull_auth(auth, image);
write_image_config_sidecar(&puller, image, &pull_auth, image_dir).await;
}
fn kernel_cache_dir(&self) -> PathBuf {
self.linux_dir().join("kernel")
}
async fn ensure_linux_kernel(&self) -> Result<LinuxKernel> {
let env_kernel = std::env::var_os("ZLAYER_VZ_LINUX_KERNEL").map(PathBuf::from);
let env_initrd = std::env::var_os("ZLAYER_VZ_LINUX_INITRD").map(PathBuf::from);
if let (Some(image), Some(initramfs)) = (env_kernel, env_initrd) {
if image.exists() && initramfs.exists() {
tracing::info!(
kernel = %image.display(),
initramfs = %initramfs.display(),
"vz-linux: using kernel from ZLAYER_VZ_LINUX_KERNEL/_INITRD override"
);
return Ok(LinuxKernel { image, initramfs });
}
tracing::warn!(
"vz-linux: ZLAYER_VZ_LINUX_KERNEL/_INITRD set but a path does not exist; \
falling back to the pulled bundle / on-disk cache"
);
}
let cache = self.kernel_cache_dir();
let image = cache.join("Image");
let initramfs = cache.join("initramfs.cpio.gz");
match self.pull_kernel_bundle(&cache).await {
Ok(kernel) => return Ok(kernel),
Err(e) => {
if image.exists() && initramfs.exists() {
tracing::warn!(
kernel = %image.display(),
initramfs = %initramfs.display(),
error = %e,
bundle = VZ_LINUX_BUNDLE_IMAGE,
"vz-linux: registry unreachable; using cached kernel bundle \
(which may be stale)"
);
return Ok(LinuxKernel { image, initramfs });
}
tracing::warn!(
error = %e,
bundle = VZ_LINUX_BUNDLE_IMAGE,
"vz-linux: kernel-bundle pull failed and no cache present"
);
}
}
Err(AgentError::Configuration(format!(
"vz-linux kernel artifact not found; the bundle image {bundle} could not be pulled \
(the GHCR package must be public) — set ZLAYER_VZ_LINUX_KERNEL/_INITRD or run \
images/vz-linux/build.sh and place Image+initramfs.cpio.gz in {cache}",
bundle = VZ_LINUX_BUNDLE_IMAGE,
cache = cache.display(),
)))
}
#[allow(clippy::too_many_lines)]
async fn pull_kernel_bundle(&self, cache_dir: &std::path::Path) -> Result<LinuxKernel> {
let image = VZ_LINUX_BUNDLE_IMAGE;
tokio::fs::create_dir_all(cache_dir)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("create kernel cache dir: {e}"),
})?;
let cache_path = self.images_dir().join("blobs.redb");
let blob_cache = zlayer_registry::CacheType::persistent_at(&cache_path)
.build()
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("open blob cache: {e}"),
})?;
let pull_auth = Self::resolve_pull_auth(None, image);
let mut puller = zlayer_registry::ImagePuller::from_env_for_runtime(
Arc::clone(&blob_cache),
zlayer_spec::SourcePolicy::default(),
)
.await;
if let Some(reg) = self.open_local_registry().await {
puller = puller.with_local_registry(reg);
}
let layers =
puller
.pull_image(image, &pull_auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("pull kernel bundle layers: {e}"),
})?;
let digest_key = zlayer_registry::manifest_digest_cache_key(image);
let remote_digest = blob_cache
.get(&digest_key)
.await
.ok()
.flatten()
.and_then(|bytes| String::from_utf8(bytes).ok());
let kernel_path = cache_dir.join("Image");
let initrd_path = cache_dir.join("initramfs.cpio.gz");
let digest_path = cache_dir.join("digest");
let cached_digest = tokio::fs::read_to_string(&digest_path)
.await
.ok()
.map(|s| s.trim().to_string());
let need_extract = kernel_bundle_needs_extract(
remote_digest.as_deref(),
cached_digest.as_deref(),
kernel_path.exists(),
initrd_path.exists(),
);
if !need_extract {
tracing::info!(
kernel = %kernel_path.display(),
initramfs = %initrd_path.display(),
digest = remote_digest.as_deref().unwrap_or("<unknown>"),
"vz-linux: kernel bundle already current; using on-disk cache"
);
return Ok(LinuxKernel {
image: kernel_path,
initramfs: initrd_path,
});
}
tracing::info!(
image = %image,
layer_count = layers.len(),
digest = remote_digest.as_deref().unwrap_or("<unknown>"),
"vz-linux: extracting kernel bundle layers"
);
let staging = cache_dir.join(".extract.tmp");
let _ = tokio::fs::remove_dir_all(&staging).await;
let mut unpacker = zlayer_registry::LayerUnpacker::new(staging.clone());
unpacker
.unpack_layers(&layers)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("extract kernel bundle: {e}"),
})?;
let src_kernel = staging.join("vz-linux").join("Image");
let src_initrd = staging.join("vz-linux").join("initramfs.cpio.gz");
let src_manifest = staging.join("vz-linux").join("manifest.json");
if !src_kernel.exists() || !src_initrd.exists() {
let _ = tokio::fs::remove_dir_all(&staging).await;
return Err(AgentError::Configuration(format!(
"vz-linux bundle image {image} is missing vz-linux/Image or \
vz-linux/initramfs.cpio.gz after extraction"
)));
}
install_atomically(&src_kernel, &kernel_path).await?;
install_atomically(&src_initrd, &initrd_path).await?;
if src_manifest.exists() {
let manifest_path = cache_dir.join("manifest.json");
if let Err(e) = install_atomically(&src_manifest, &manifest_path).await {
tracing::debug!(error = %e, "vz-linux: failed to install bundle manifest.json (non-fatal)");
}
}
if let Some(digest) = remote_digest.as_deref() {
let tmp_digest = cache_dir.join("digest.tmp");
tokio::fs::write(&tmp_digest, digest.as_bytes())
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("write kernel digest: {e}"),
})?;
tokio::fs::rename(&tmp_digest, &digest_path)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("install kernel digest: {e}"),
})?;
}
let _ = tokio::fs::remove_dir_all(&staging).await;
tracing::info!(
kernel = %kernel_path.display(),
initramfs = %initrd_path.display(),
digest = remote_digest.as_deref().unwrap_or("<unknown>"),
"vz-linux: kernel bundle pulled and installed"
);
Ok(LinuxKernel {
image: kernel_path,
initramfs: initrd_path,
})
}
fn vm_dir(&self, id: &ContainerId) -> PathBuf {
self.linux_dir().join(Self::container_dir_name(id))
}
async fn read_console(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let dir_name = Self::container_dir_name(id);
let log = {
let guard = self.containers.read().await;
guard.get(&dir_name).map(|c| c.console_log.clone())
};
let Some(log) = log else {
return Ok(Vec::new());
};
let content = tokio::fs::read_to_string(&log).await.unwrap_or_default();
let lines: Vec<&str> = content.lines().collect();
let start = lines.len().saturating_sub(tail);
Ok(lines[start..]
.iter()
.map(|l| LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
source: LogSource::Container(dir_name.clone()),
message: (*l).to_string(),
service: None,
deployment: None,
})
.collect())
}
async fn captured_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let dir_name = Self::container_dir_name(id);
let mut merged = self.read_console(id, tail).await.unwrap_or_default();
let outcome = {
let guard = self.containers.read().await;
guard.get(&dir_name).map(|c| Arc::clone(&c.outcome))
};
if let Some(outcome) = outcome {
if let Ok(buf) = outcome.logs.lock() {
merged.extend(buf.iter().cloned());
}
}
let start = merged.len().saturating_sub(tail);
Ok(merged.split_off(start))
}
async fn spawn_port_forwards(&self, dir_name: &str) {
let (ports, live): (Vec<(u16, zlayer_spec::PortProtocol)>, Option<LiveVm>) = {
let guard = self.containers.read().await;
let Some(c) = guard.get(dir_name) else {
return;
};
let mut seen = std::collections::HashSet::new();
let ports = c
.spec
.port_mappings
.iter()
.filter(|m| m.protocol == zlayer_spec::PortProtocol::Tcp)
.filter_map(|m| {
if seen.insert(m.container_port) {
Some((m.container_port, m.protocol))
} else {
None
}
})
.collect();
let live = c.live.as_ref().map(|l| LiveVm {
queue: l.queue.clone(),
vm: Arc::clone(&l.vm),
});
(ports, live)
};
let Some(live) = live else {
return;
};
if ports.is_empty() {
return;
}
let mut forwards = Vec::with_capacity(ports.len());
for (container_port, protocol) in ports {
let containers = Arc::clone(&self.containers);
let dir = dir_name.to_string();
let live_for_task = LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
};
let task = tokio::spawn(async move {
run_port_forward(containers, dir, live_for_task, container_port).await;
});
forwards.push(PortForward {
container_port,
protocol,
task,
});
}
let mut guard = self.containers.write().await;
if let Some(c) = guard.get_mut(dir_name) {
c.port_forwards = forwards;
} else {
for f in forwards {
f.task.abort();
}
}
}
}
async fn run_port_forward(
containers: Arc<RwLock<HashMap<String, VzLinuxContainer>>>,
dir_name: String,
live: LiveVm,
container_port: u16,
) {
use std::net::{Ipv4Addr, SocketAddr};
use tokio::net::TcpListener;
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), container_port);
let listener = match TcpListener::bind(bind_addr).await {
Ok(l) => l,
Err(e) => {
tracing::warn!(
container = %dir_name,
container_port,
error = %e,
"vz-linux: failed to bind loopback forwarder"
);
return;
}
};
tracing::info!(
container = %dir_name,
%bind_addr,
container_port,
"vz-linux: loopback->vsock forwarder up"
);
loop {
{
let guard = containers.read().await;
if guard.get(&dir_name).is_none_or(|c| c.live.is_none()) {
return;
}
}
let accept = tokio::time::timeout(Duration::from_secs(1), listener.accept()).await;
let (inbound, _peer) = match accept {
Ok(Ok(conn)) => conn,
Ok(Err(e)) => {
tracing::debug!(container = %dir_name, error = %e, "vz-linux: forward accept error");
continue;
}
Err(_) => continue,
};
let inbound_std = match inbound.into_std() {
Ok(s) => s,
Err(e) => {
tracing::debug!(container = %dir_name, error = %e, "vz-linux: into_std failed");
continue;
}
};
let live_for_conn = LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
};
let dir = dir_name.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = inbound_std.set_nonblocking(false) {
tracing::debug!(container = %dir, error = %e, "vz-linux: set_blocking failed");
return;
}
tunnel_connection_over_vsock(&live_for_conn, inbound_std, container_port, &dir);
});
}
}
fn tunnel_connection_over_vsock(
live: &LiveVm,
inbound: std::net::TcpStream,
container_port: u16,
dir_name: &str,
) {
let mut vsock = match connect_vsock(live) {
Ok(s) => s,
Err(e) => {
tracing::debug!(container = %dir_name, error = %e, "vz-linux: forward vsock connect failed");
return;
}
};
if let Err(e) = proto::write_frame(
&mut vsock,
&proto::Msg::Forward {
port: container_port,
},
) {
tracing::debug!(container = %dir_name, error = %e, "vz-linux: forward send Forward failed");
return;
}
let (mut vsock_r, mut vsock_w) = match vsock.try_clone() {
Ok(clone) => (vsock, clone),
Err(e) => {
tracing::debug!(container = %dir_name, error = %e, "vz-linux: vsock clone failed");
return;
}
};
let (mut tcp_r, mut tcp_w) = match inbound.try_clone() {
Ok(clone) => (inbound, clone),
Err(e) => {
tracing::debug!(container = %dir_name, error = %e, "vz-linux: tcp clone failed");
return;
}
};
let h = std::thread::spawn(move || {
let _ = std::io::copy(&mut tcp_r, &mut vsock_w);
let _ = vsock_w.shutdown(std::net::Shutdown::Write);
});
let _ = std::io::copy(&mut vsock_r, &mut tcp_w);
let _ = tcp_w.shutdown(std::net::Shutdown::Write);
let _ = h.join();
}
fn sanitize_image_name(image: &str) -> String {
image.replace(['/', ':', '@'], "_")
}
fn rootfs_is_populated(rootfs_dir: &std::path::Path) -> bool {
std::fs::read_dir(rootfs_dir).is_ok_and(|mut entries| entries.next().is_some())
}
fn kernel_bundle_needs_extract(
remote_digest: Option<&str>,
cached_digest: Option<&str>,
kernel_present: bool,
initrd_present: bool,
) -> bool {
if !kernel_present || !initrd_present {
return true;
}
match (remote_digest, cached_digest) {
(Some(remote), cached) => cached != Some(remote),
(None, _) => false,
}
}
async fn install_atomically(src: &std::path::Path, dst: &std::path::Path) -> Result<()> {
let tmp = match dst.file_name() {
Some(name) => {
let mut t = name.to_os_string();
t.push(".tmp");
dst.with_file_name(t)
}
None => dst.with_extension("tmp"),
};
tokio::fs::copy(src, &tmp)
.await
.map_err(|e| AgentError::PullFailed {
image: VZ_LINUX_BUNDLE_IMAGE.to_string(),
reason: format!("stage {}: {e}", dst.display()),
})?;
tokio::fs::rename(&tmp, dst)
.await
.map_err(|e| AgentError::PullFailed {
image: VZ_LINUX_BUNDLE_IMAGE.to_string(),
reason: format!("install {}: {e}", dst.display()),
})?;
Ok(())
}
async fn write_image_config_sidecar(
puller: &zlayer_registry::ImagePuller,
image: &str,
auth: &zlayer_registry::RegistryAuth,
image_dir: &std::path::Path,
) {
let config = match puller.pull_image_config(image, auth).await {
Ok(c) => c,
Err(e) => {
tracing::debug!(
image = %image,
error = %e,
"vz-linux: failed to fetch OCI config blob; the image-default \
command/env will not apply (dispatch uses its macOS default OS hint)",
);
return;
}
};
let json = match serde_json::to_string_pretty(&config) {
Ok(j) => j,
Err(e) => {
tracing::warn!(image = %image, error = %e, "vz-linux: failed to serialize image config sidecar");
return;
}
};
let sidecar = image_dir.join("image-config.json");
if let Err(e) = tokio::fs::write(&sidecar, json).await {
tracing::warn!(
image = %image,
error = %e,
"vz-linux: failed to write image-config.json sidecar; image-default command/env will not apply on run",
);
}
}
fn read_image_config_sidecar(rootfs_dir: &std::path::Path) -> Option<zlayer_registry::ImageConfig> {
let sidecar = rootfs_dir.parent()?.join("image-config.json");
let bytes = std::fs::read(&sidecar).ok()?;
match serde_json::from_slice::<zlayer_registry::ImageConfig>(&bytes) {
Ok(cfg) => Some(cfg),
Err(e) => {
tracing::debug!(
sidecar = %sidecar.display(),
error = %e,
"vz-linux: failed to parse image-config.json sidecar; \
image-default command/env will not apply",
);
None
}
}
}
const VZ_LINUX_BUNDLE_IMAGE: &str = "ghcr.io/blackleafdigital/zlayer/vz-linux:arm64";
const LINUX_CMDLINE: &str = "console=hvc0 rootfstag=rootfs rw";
fn build_linux_cmdline(now: std::time::SystemTime) -> String {
now.duration_since(std::time::UNIX_EPOCH).map_or_else(
|_| LINUX_CMDLINE.to_string(),
|d| format!("{LINUX_CMDLINE} zlayer.boottime={}", d.as_secs()),
)
}
const VIRTIOFS_ROOTFS_TAG: &str = "rootfs";
const VSOCK_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
const VSOCK_CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(250);
const SETTIME_RESYNC_INTERVAL: Duration = Duration::from_secs(300);
#[cfg(test)]
pub(crate) fn mac_roundtrip(mac: &str) -> Option<String> {
let mac_str = NSString::from_str(mac);
let parsed = unsafe { VZMACAddress::initWithString(VZMACAddress::alloc(), &mac_str) }?;
Some(unsafe { parsed.string() }.to_string())
}
#[cfg(test)]
fn not_yet(op: &str) -> AgentError {
AgentError::Configuration(format!(
"vz-linux: {op} not implemented until a later phase"
))
}
#[derive(Clone, Debug)]
pub(crate) struct BindMount {
pub(crate) host: PathBuf,
pub(crate) tag: String,
pub(crate) target: String,
pub(crate) readonly: bool,
}
#[derive(Clone)]
pub(crate) struct LinuxVmBuildInputs {
pub(crate) kernel: LinuxKernel,
pub(crate) console_log: PathBuf,
pub(crate) image_rootfs_dir: PathBuf,
pub(crate) cpu_count: usize,
pub(crate) memory_bytes: u64,
pub(crate) mac: String,
pub(crate) bind_mounts: Vec<BindMount>,
}
pub(crate) fn build_config_linux(
inputs: &LinuxVmBuildInputs,
) -> std::result::Result<Retained<VZVirtualMachineConfiguration>, String> {
unsafe {
let config = VZVirtualMachineConfiguration::new();
config.setCPUCount(inputs.cpu_count);
config.setMemorySize(inputs.memory_bytes);
let platform = VZGenericPlatformConfiguration::new();
let platform_super: &VZPlatformConfiguration = platform.as_super();
config.setPlatform(platform_super);
let kernel_url = file_url(&inputs.kernel.image);
let linux_boot =
VZLinuxBootLoader::initWithKernelURL(VZLinuxBootLoader::alloc(), &kernel_url);
let initrd_url = file_url(&inputs.kernel.initramfs);
linux_boot.setInitialRamdiskURL(Some(&initrd_url));
let cmdline_str = build_linux_cmdline(std::time::SystemTime::now());
let cmdline = NSString::from_str(&cmdline_str);
linux_boot.setCommandLine(&cmdline);
let boot: Retained<VZBootLoader> = Retained::into_super(linux_boot);
config.setBootLoader(Some(&boot));
let _ = std::fs::File::create(&inputs.console_log);
let log_str = NSString::from_str(&inputs.console_log.to_string_lossy());
if let Some(write_fh) = NSFileHandle::fileHandleForWritingAtPath(&log_str) {
let serial = VZVirtioConsoleDeviceSerialPortConfiguration::new();
let attach =
VZFileHandleSerialPortAttachment::initWithFileHandleForReading_fileHandleForWriting(
VZFileHandleSerialPortAttachment::alloc(),
None,
Some(&write_fh),
);
serial.setAttachment(Some(&attach));
let serial_super = Retained::into_super(serial);
let serial_arr = NSArray::from_retained_slice(&[serial_super]);
config.setSerialPorts(&serial_arr);
}
let mut fs_devices: Vec<Retained<VZDirectorySharingDeviceConfiguration>> = Vec::new();
let tag = NSString::from_str(VIRTIOFS_ROOTFS_TAG);
let fs = VZVirtioFileSystemDeviceConfiguration::initWithTag(
VZVirtioFileSystemDeviceConfiguration::alloc(),
&tag,
);
let rootfs_url = file_url(&inputs.image_rootfs_dir);
let dir =
VZSharedDirectory::initWithURL_readOnly(VZSharedDirectory::alloc(), &rootfs_url, true);
let share =
VZSingleDirectoryShare::initWithDirectory(VZSingleDirectoryShare::alloc(), &dir);
let share_super: Retained<VZDirectoryShare> = Retained::into_super(share);
fs.setShare(Some(&share_super));
let fs_super: Retained<VZDirectorySharingDeviceConfiguration> = Retained::into_super(fs);
fs_devices.push(fs_super);
for bind in &inputs.bind_mounts {
let bind_tag = NSString::from_str(&bind.tag);
let bind_fs = VZVirtioFileSystemDeviceConfiguration::initWithTag(
VZVirtioFileSystemDeviceConfiguration::alloc(),
&bind_tag,
);
let bind_url = file_url(&bind.host);
let bind_dir = VZSharedDirectory::initWithURL_readOnly(
VZSharedDirectory::alloc(),
&bind_url,
bind.readonly,
);
let bind_share = VZSingleDirectoryShare::initWithDirectory(
VZSingleDirectoryShare::alloc(),
&bind_dir,
);
let bind_share_super: Retained<VZDirectoryShare> = Retained::into_super(bind_share);
bind_fs.setShare(Some(&bind_share_super));
let bind_fs_super: Retained<VZDirectorySharingDeviceConfiguration> =
Retained::into_super(bind_fs);
fs_devices.push(bind_fs_super);
}
let fs_arr = NSArray::from_retained_slice(&fs_devices);
config.setDirectorySharingDevices(&fs_arr);
let net = VZVirtioNetworkDeviceConfiguration::new();
let nat = VZNATNetworkDeviceAttachment::new();
net.setAttachment(Some(&nat));
let mac_str = NSString::from_str(&inputs.mac);
if let Some(mac) = VZMACAddress::initWithString(VZMACAddress::alloc(), &mac_str) {
net.setMACAddress(&mac);
}
let net_super: Retained<VZNetworkDeviceConfiguration> = Retained::into_super(net);
let net_arr = NSArray::from_retained_slice(&[net_super]);
config.setNetworkDevices(&net_arr);
let entropy = VZVirtioEntropyDeviceConfiguration::new();
let entropy_super: Retained<VZEntropyDeviceConfiguration> = Retained::into_super(entropy);
let entropy_arr = NSArray::from_retained_slice(&[entropy_super]);
config.setEntropyDevices(&entropy_arr);
let vsock = VZVirtioSocketDeviceConfiguration::new();
let vsock_super: Retained<VZSocketDeviceConfiguration> = Retained::into_super(vsock);
let vsock_arr = NSArray::from_retained_slice(&[vsock_super]);
config.setSocketDevices(&vsock_arr);
config
.validateWithError()
.map_err(|e| format!("configuration invalid: {}", e.localizedDescription()))?;
Ok(config)
}
}
pub(crate) fn build_run_message(
spec: &ServiceSpec,
image: Option<&zlayer_registry::ImageConfig>,
) -> proto::Msg {
let argv = macos_vz_shared::resolve_entrypoint(spec, image);
let env = macos_vz_shared::merge_env(spec, image);
let cwd = macos_vz_shared::resolve_workdir(spec, image);
let (uid, gid) = macos_vz_shared::resolve_user(spec, image);
proto::Msg::Run {
argv,
env,
cwd,
uid,
gid,
}
}
pub(crate) fn derive_bind_mounts(spec: &ServiceSpec) -> Vec<BindMount> {
spec.storage
.iter()
.filter_map(|s| match s {
zlayer_spec::StorageSpec::Bind {
source,
target,
readonly,
} => Some((source.clone(), target.clone(), *readonly)),
_ => None,
})
.enumerate()
.map(|(i, (source, target, readonly))| BindMount {
host: PathBuf::from(source),
tag: format!("zlmnt{i}"),
target,
readonly,
})
.collect()
}
fn connect_vsock(live: &LiveVm) -> std::result::Result<UnixStream, String> {
let deadline = Instant::now() + VSOCK_CONNECT_TIMEOUT;
let mut last_err = "vsock connect never attempted".to_string();
loop {
if Instant::now() >= deadline {
return Err(format!("vsock connect timed out: {last_err}"));
}
let (tx, rx) =
std::sync::mpsc::channel::<std::result::Result<std::os::unix::io::RawFd, String>>();
let vm = Arc::clone(&live.vm);
live.queue.exec_async(move || {
let block = RcBlock::new(
move |conn: *mut VZVirtioSocketConnection, err: *mut NSError| {
if !err.is_null() {
let msg = unsafe { (*err).localizedDescription() }.to_string();
let _ = tx.send(Err(msg));
return;
}
if conn.is_null() {
let _ = tx.send(Err("vsock connect: null connection".to_string()));
return;
}
let fd = unsafe { (*conn).fileDescriptor() };
let owned = unsafe { libc::dup(fd) };
if owned < 0 {
let e = std::io::Error::last_os_error();
let _ = tx.send(Err(format!("dup(vsock fd): {e}")));
return;
}
let _ = tx.send(Ok(owned));
},
);
unsafe {
let devices = vm.0.socketDevices();
if devices.count() == 0 {
let _ = block; return;
}
let dev = devices.objectAtIndex(0);
if let Ok(vsock) = dev.downcast::<VZVirtioSocketDevice>() {
vsock.connectToPort_completionHandler(proto::CONTROL_PORT, &block);
} else {
}
}
});
match rx.recv() {
Ok(Ok(fd)) => {
return Ok(unsafe { UnixStream::from_raw_fd(fd) });
}
Ok(Err(e)) => last_err = e,
Err(_) => last_err = "vsock connect channel closed".to_string(),
}
std::thread::sleep(VSOCK_CONNECT_RETRY_INTERVAL);
}
}
fn capture_chunk(
outcome: &RunOutcome,
console_log: &std::path::Path,
dir_name: &str,
stream: LogStream,
bytes: &[u8],
) {
let message = String::from_utf8_lossy(bytes).into_owned();
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(console_log)
{
let _ = f.write_all(bytes);
}
if let Ok(mut buf) = outcome.logs.lock() {
buf.push(LogEntry {
timestamp: chrono::Utc::now(),
stream,
source: LogSource::Container(dir_name.to_string()),
message,
service: None,
deployment: None,
});
}
}
#[allow(clippy::too_many_lines)]
fn run_and_drain(
live: &LiveVm,
run_msg: &proto::Msg,
bind_mounts: &[BindMount],
stdin_rx: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
outcome: &RunOutcome,
console_log: &std::path::Path,
dir_name: &str,
) {
let mut stream = match connect_vsock(live) {
Ok(s) => s,
Err(e) => {
if let Ok(mut t) = outcome.terminal.lock() {
*t = Some(ContainerState::Failed {
reason: format!("vsock connect failed: {e}"),
});
}
tracing::warn!(container = %dir_name, error = %e, "vz-linux: vsock connect failed");
return;
}
};
if let Err(e) = proto::write_frame(&mut stream, run_msg) {
if let Ok(mut t) = outcome.terminal.lock() {
*t = Some(ContainerState::Failed {
reason: format!("send Run failed: {e}"),
});
}
tracing::warn!(container = %dir_name, error = %e, "vz-linux: failed to send Run");
return;
}
for bind in bind_mounts {
if let Err(e) = proto::write_frame(
&mut stream,
&proto::Msg::Mount {
tag: bind.tag.clone(),
target: bind.target.clone(),
readonly: bind.readonly,
},
) {
tracing::warn!(
container = %dir_name,
tag = %bind.tag,
target = %bind.target,
error = %e,
"vz-linux: failed to send Mount frame"
);
}
}
let _stdin_handle = stdin_rx.and_then(|rx| match stream.try_clone() {
Ok(mut stdin_w) => Some(std::thread::spawn(move || {
while let Ok(chunk) = rx.recv() {
if proto::write_frame(&mut stdin_w, &proto::Msg::Stdin(chunk)).is_err() {
return;
}
}
let _ = proto::write_frame(&mut stdin_w, &proto::Msg::StdinEof);
})),
Err(e) => {
tracing::debug!(error = %e, "vz-linux: stdin stream clone failed; stdin disabled");
None
}
});
loop {
match proto::read_frame(&mut stream) {
Ok(proto::Msg::Stdout(b)) => {
capture_chunk(outcome, console_log, dir_name, LogStream::Stdout, &b);
}
Ok(proto::Msg::Stderr(b)) => {
capture_chunk(outcome, console_log, dir_name, LogStream::Stderr, &b);
}
Ok(proto::Msg::Started { pid }) => {
if let Ok(mut p) = outcome.pid.lock() {
*p = Some(pid);
}
tracing::debug!(container = %dir_name, pid, "vz-linux: workload started");
}
Ok(proto::Msg::Exited { code }) => {
record_exit(outcome, code);
tracing::debug!(container = %dir_name, code, "vz-linux: workload exited");
return;
}
Ok(proto::Msg::Error { message }) => {
if let Ok(mut t) = outcome.terminal.lock() {
*t = Some(ContainerState::Failed {
reason: message.clone(),
});
}
tracing::warn!(container = %dir_name, error = %message, "vz-linux: agent error");
return;
}
Ok(other) => {
tracing::trace!(container = %dir_name, tag = other.tag(), "vz-linux: unexpected frame");
}
Err(e) => {
let already = outcome
.terminal
.lock()
.ok()
.and_then(|g| g.clone())
.is_some();
if !already {
if let Ok(mut t) = outcome.terminal.lock() {
*t = Some(ContainerState::Exited { code: 0 });
}
if let Ok(mut c) = outcome.exit_code.lock() {
c.get_or_insert(0);
}
}
tracing::debug!(container = %dir_name, error = %e, "vz-linux: drain stream ended");
return;
}
}
}
}
async fn teardown_container(
containers: &Arc<RwLock<HashMap<String, VzLinuxContainer>>>,
dir_name: &str,
) {
let (state_dir, live, drain, forwards) = {
let mut guard = containers.write().await;
match guard.remove(dir_name) {
Some(c) => (Some(c.state_dir), c.live, c.drain_task, c.port_forwards),
None => return,
}
};
if let Some(drain) = drain {
drain.abort();
}
for f in forwards {
f.task.abort();
}
if let Some(live) = live {
let _ = tokio::task::spawn_blocking(move || {
let r = run_vm_lifecycle(&live, VmLifecycleOp::Stop);
drop(live);
r
})
.await;
}
if let Some(dir) = state_dir {
let _ = tokio::fs::remove_dir_all(dir).await;
}
}
async fn watch_for_auto_remove(
containers: Arc<RwLock<HashMap<String, VzLinuxContainer>>>,
dir_name: String,
) {
loop {
let outcome = {
let guard = containers.read().await;
match guard.get(&dir_name) {
Some(c) => Arc::clone(&c.outcome),
None => return,
}
};
let terminal = outcome
.terminal
.lock()
.ok()
.and_then(|g| g.clone())
.is_some();
let exited = outcome.exit_code.lock().ok().and_then(|g| *g).is_some();
if terminal || exited {
tracing::info!(
container = %dir_name,
"vz-linux: delete_on_exit — workload terminal, removing container"
);
teardown_container(&containers, &dir_name).await;
return;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
fn exec_and_collect(
live: &LiveVm,
exec_msg: &proto::Msg,
) -> std::result::Result<(i32, String, String), String> {
let mut stream = connect_vsock(live)?;
proto::write_frame(&mut stream, exec_msg).map_err(|e| format!("send Exec failed: {e}"))?;
let mut stdout: Vec<u8> = Vec::new();
let mut stderr: Vec<u8> = Vec::new();
loop {
match proto::read_frame(&mut stream) {
Ok(proto::Msg::Stdout(b)) => stdout.extend_from_slice(&b),
Ok(proto::Msg::Stderr(b)) => stderr.extend_from_slice(&b),
Ok(proto::Msg::Exited { code }) => {
return Ok((
code,
String::from_utf8_lossy(&stdout).into_owned(),
String::from_utf8_lossy(&stderr).into_owned(),
));
}
Ok(proto::Msg::Error { message }) => {
return Err(format!("guest exec error: {message}"));
}
Ok(_) => {}
Err(proto::ProtoError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok((
-1,
String::from_utf8_lossy(&stdout).into_owned(),
String::from_utf8_lossy(&stderr).into_owned(),
));
}
Err(e) => return Err(format!("exec drain failed: {e}")),
}
}
}
fn parse_exec_user(user: Option<&str>) -> (Option<u32>, Option<u32>, Option<String>) {
let Some(raw) = user.map(str::trim).filter(|u| !u.is_empty()) else {
return (None, None, None);
};
let (u, g) = match raw.split_once(':') {
Some((u, g)) => (u, Some(g)),
None => (raw, None),
};
let uid = u.parse::<u32>().ok();
let gid = g.and_then(|g| g.parse::<u32>().ok());
let needs_name_resolution = uid.is_none() || (g.is_some() && gid.is_none());
let user_name = if needs_name_resolution {
Some(raw.to_string())
} else {
None
};
(uid, gid, user_name)
}
pub(crate) fn signal_number(name: &str) -> i32 {
match name.trim().to_ascii_uppercase().as_str() {
"SIGHUP" => 1,
"SIGINT" => 2,
"SIGUSR1" => 10,
"SIGUSR2" => 12,
"SIGTERM" => 15,
_ => 9,
}
}
fn signal_agent(live: &LiveVm, signum: i32) -> std::result::Result<(), String> {
let mut stream = connect_vsock(live)?;
proto::write_frame(&mut stream, &proto::Msg::Signal { signum })
.map_err(|e| format!("send Signal({signum}) failed: {e}"))?;
Ok(())
}
fn push_overlay_agent(live: &LiveVm, msg: &proto::Msg) -> std::result::Result<bool, String> {
use std::io::Read as _;
let mut stream = connect_vsock(live)?;
proto::write_frame(&mut stream, msg).map_err(|e| format!("send OverlayConfig failed: {e}"))?;
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.shutdown(std::net::Shutdown::Write);
let mut sink = [0u8; 256];
let mut saw_eof = false;
loop {
match stream.read(&mut sink) {
Ok(0) => {
saw_eof = true;
break;
}
Ok(_) => {} Err(_) => break, }
}
Ok(saw_eof)
}
fn push_settime(live: &LiveVm) -> std::result::Result<bool, String> {
use std::io::Read as _;
let unix_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| {
i64::try_from(d.as_secs()).unwrap_or(i64::MAX)
})
.map_err(|e| format!("host clock before UNIX_EPOCH: {e}"))?;
let msg = proto::Msg::SetTime { unix_secs };
let mut stream = connect_vsock(live)?;
proto::write_frame(&mut stream, &msg).map_err(|e| format!("send SetTime failed: {e}"))?;
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.shutdown(std::net::Shutdown::Write);
let mut sink = [0u8; 256];
let mut saw_eof = false;
loop {
match stream.read(&mut sink) {
Ok(0) => {
saw_eof = true;
break;
}
Ok(_) => {} Err(_) => break, }
}
Ok(saw_eof)
}
fn nat_gateway_for(ip: IpAddr) -> String {
match ip {
IpAddr::V4(v4) => {
let o = v4.octets();
format!("{}.{}.{}.1", o[0], o[1], o[2])
}
IpAddr::V6(_) => "192.168.64.1".to_string(),
}
}
fn rewrite_endpoint_host(endpoint: &str, gateway: &str) -> String {
if endpoint.is_empty() {
return String::new();
}
match endpoint.rsplit_once(':') {
Some((_host, port)) => format!("{gateway}:{port}"),
None => endpoint.to_string(),
}
}
fn record_exit(outcome: &RunOutcome, code: i32) {
if let Ok(mut c) = outcome.exit_code.lock() {
*c = Some(code);
}
if let Ok(mut t) = outcome.terminal.lock() {
*t = Some(ContainerState::Exited { code });
}
}
fn outcome_to_state(
exit_code: Option<i32>,
terminal: Option<ContainerState>,
fallback: ContainerState,
) -> ContainerState {
if let Some(terminal) = terminal {
return terminal;
}
if let Some(code) = exit_code {
return ContainerState::Exited { code };
}
fallback
}
#[async_trait::async_trait]
impl Runtime for VzLinuxRuntime {
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(
image,
zlayer_spec::PullPolicy::IfNotPresent,
None,
zlayer_spec::SourcePolicy::default(),
)
.await
}
async fn pull_image_with_policy(
&self,
image: &str,
policy: zlayer_spec::PullPolicy,
auth: Option<&RegistryAuth>,
source: zlayer_spec::SourcePolicy,
) -> Result<()> {
let safe_name = sanitize_image_name(image);
let image_dir = self.images_dir().join(&safe_name);
let rootfs_dir = image_dir.join("rootfs");
match policy {
zlayer_spec::PullPolicy::Always | zlayer_spec::PullPolicy::Newer => {
}
zlayer_spec::PullPolicy::IfNotPresent => {
if let Some(existing) = self.resolve_existing_rootfs(image) {
tracing::debug!(
image = %image,
rootfs = %existing.display(),
"vz-linux image already present; skipping pull"
);
self.ensure_image_config_sidecar(image, &existing, auth, source)
.await;
self.image_rootfs.write().await.insert(safe_name, existing);
return Ok(());
}
}
zlayer_spec::PullPolicy::Never => {
if let Some(existing) = self.resolve_existing_rootfs(image) {
self.ensure_image_config_sidecar(image, &existing, auth, source)
.await;
self.image_rootfs.write().await.insert(safe_name, existing);
return Ok(());
}
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: "image not present (or its rootfs is empty) and pull policy is Never"
.to_string(),
});
}
}
tracing::info!(image = %image, "pulling image for vz-linux runtime");
tokio::fs::create_dir_all(&rootfs_dir)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("create rootfs dir: {e}"),
})?;
let cache_path = self.images_dir().join("blobs.redb");
let blob_cache = zlayer_registry::CacheType::persistent_at(&cache_path)
.build()
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("open blob cache: {e}"),
})?;
let pull_auth = Self::resolve_pull_auth(auth, image);
let mut puller =
zlayer_registry::ImagePuller::from_env_for_runtime(blob_cache, source).await;
if let Some(reg) = self.open_local_registry().await {
puller = puller.with_local_registry(reg);
}
let layers =
puller
.pull_image(image, &pull_auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("pull image layers: {e}"),
})?;
write_image_config_sidecar(&puller, image, &pull_auth, &image_dir).await;
tracing::info!(
image = %image,
layer_count = layers.len(),
"extracting layers to vz-linux image rootfs"
);
let mut unpacker = zlayer_registry::LayerUnpacker::new(rootfs_dir.clone());
unpacker
.unpack_layers(&layers)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("extract rootfs: {e}"),
})?;
self.image_rootfs
.write()
.await
.insert(safe_name, rootfs_dir.clone());
tracing::info!(
image = %image,
rootfs = %rootfs_dir.display(),
"vz-linux image pulled successfully"
);
Ok(())
}
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let state_dir = self.vm_dir(id);
let image_name = spec.image.name.to_string();
let safe_image = sanitize_image_name(&image_name);
let image_rootfs = {
let images = self.image_rootfs.read().await;
images.get(&safe_image).cloned()
}
.or_else(|| self.resolve_existing_rootfs(&image_name))
.unwrap_or_else(|| self.images_dir().join(&safe_image).join("rootfs"));
if !image_rootfs.exists() {
return Err(AgentError::CreateFailed {
id: dir_name,
reason: format!(
"image rootfs not found at {}; pull the image first",
image_rootfs.display()
),
});
}
if !rootfs_is_populated(&image_rootfs) {
return Err(AgentError::CreateFailed {
id: dir_name,
reason: format!(
"image rootfs at {} is empty (a previous pull was interrupted or failed); \
re-pull the image — remove the empty directory if the re-pull keeps \
short-circuiting under PullPolicy::IfNotPresent",
image_rootfs.display()
),
});
}
let image_config = read_image_config_sidecar(&image_rootfs);
if image_config.is_none() {
tracing::warn!(
container = %dir_name,
image = %image_name,
"vz-linux: no image-config.json sidecar found for image; the image's \
default ENTRYPOINT/CMD/ENV/WORKDIR/USER will NOT apply (re-pull the \
image to generate the sidecar). The spec command still runs.",
);
}
std::fs::create_dir_all(&state_dir).map_err(|e| AgentError::CreateFailed {
id: dir_name.clone(),
reason: format!("create state dir: {e}"),
})?;
let rootfs_dir = state_dir.join("rootfs");
let console_log = state_dir.join("console.log");
if let Err(e) = clone_or_copy(&image_rootfs, &rootfs_dir) {
tracing::debug!(
container = %dir_name,
error = %e,
"vz-linux: clonefile of image rootfs failed; per-container rootfs deferred to a later phase"
);
}
let mac = unsafe {
VZMACAddress::randomLocallyAdministeredAddress()
.string()
.to_string()
};
if let Ok(json) = serde_json::to_string_pretty(spec) {
std::fs::write(state_dir.join("config.json"), json).ok();
}
let vcpus = spec_vcpus(spec, 2);
let ram_mib = spec_memory_mib(spec, 512, 128);
let container = VzLinuxContainer {
state: ContainerState::Pending,
state_dir,
rootfs_dir,
image_rootfs_dir: image_rootfs,
kernel: None,
console_log,
mac,
spec: spec.clone(),
image_config,
vcpus,
ram_mib,
started_at: None,
live: None,
outcome: Arc::new(RunOutcome::default()),
drain_task: None,
cleanup_task: None,
settime_task: None,
port_forwards: Vec::new(),
stdin_tx: None,
overlay_ip: None,
};
self.containers.write().await.insert(dir_name, container);
Ok(())
}
#[allow(clippy::too_many_lines)]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let kernel = self.ensure_linux_kernel().await?;
let inputs = {
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not created".to_string(),
})?;
LinuxVmBuildInputs {
kernel: kernel.clone(),
console_log: c.console_log.clone(),
image_rootfs_dir: c.image_rootfs_dir.clone(),
cpu_count: clamp_cpu_count(c.vcpus),
memory_bytes: clamp_memory_bytes(c.ram_mib),
mac: c.mac.clone(),
bind_mounts: derive_bind_mounts(&c.spec),
}
};
if !unsafe { VZVirtualMachine::isSupported() } {
return Err(AgentError::StartFailed {
id: dir_name,
reason: "Virtualization.framework is unavailable. On Apple Silicon, grant the \
com.apple.security.virtualization entitlement by signing the binary \
with `scripts/sign-vz.sh` (or build via `make build`, which auto-signs \
on macOS)."
.to_string(),
});
}
let dir_for_task = dir_name.clone();
let live = tokio::task::spawn_blocking(move || -> std::result::Result<LiveVm, String> {
let queue = DispatchQueue::new(&format!("com.zlayer.vz-linux.{dir_for_task}"), None);
let (tx, rx) = std::sync::mpsc::channel::<
std::result::Result<QueuePinned<Retained<VZVirtualMachine>>, String>,
>();
let inputs_q = inputs.clone();
let queue_for_vm = queue.clone();
queue.exec_sync(move || {
let built = build_config_linux(&inputs_q).map(|config| {
let vm = unsafe {
VZVirtualMachine::initWithConfiguration_queue(
VZVirtualMachine::alloc(),
&config,
&queue_for_vm,
)
};
QueuePinned(vm)
});
let _ = tx.send(built);
});
let pinned = rx
.recv()
.unwrap_or_else(|_| Err("VM build channel closed".to_string()))?;
let live = LiveVm {
queue,
vm: Arc::new(pinned),
};
run_vm_lifecycle(&live, VmLifecycleOp::Start)?;
Ok(live)
})
.await
.map_err(|e| AgentError::StartFailed {
id: dir_name.clone(),
reason: format!("VM start task panicked: {e}"),
})?
.map_err(|e| AgentError::StartFailed {
id: dir_name.clone(),
reason: e,
})?;
let live_for_drain = LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
};
let live_for_settime = LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
};
let (run_msg, bind_mounts, outcome, console_log) = {
let mut guard = self.containers.write().await;
let c = guard
.get_mut(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "vanished during start".to_string(),
})?;
c.kernel = Some(kernel);
c.live = Some(live);
c.state = ContainerState::Running;
c.started_at = Some(Instant::now());
(
build_run_message(&c.spec, c.image_config.as_ref()),
derive_bind_mounts(&c.spec),
Arc::clone(&c.outcome),
c.console_log.clone(),
)
};
let (stdin_tx, stdin_rx) = std::sync::mpsc::channel::<Vec<u8>>();
let dir_for_drain = dir_name.clone();
let drain = tokio::task::spawn_blocking(move || {
run_and_drain(
&live_for_drain,
&run_msg,
&bind_mounts,
Some(stdin_rx),
&outcome,
&console_log,
&dir_for_drain,
);
});
{
let mut guard = self.containers.write().await;
if let Some(c) = guard.get_mut(&dir_name) {
c.drain_task = Some(drain);
c.stdin_tx = Some(stdin_tx);
}
}
let dir_for_settime = dir_name.clone();
let settime = tokio::spawn(async move {
let mut ticker = tokio::time::interval(SETTIME_RESYNC_INTERVAL);
ticker.tick().await;
loop {
ticker.tick().await;
let live = LiveVm {
queue: live_for_settime.queue.clone(),
vm: Arc::clone(&live_for_settime.vm),
};
match tokio::task::spawn_blocking(move || push_settime(&live)).await {
Ok(Ok(acked)) => tracing::debug!(
container = %dir_for_settime,
guest_acked = acked,
"vz-linux: periodic wall-clock resync pushed to guest"
),
Ok(Err(e)) => tracing::warn!(
container = %dir_for_settime,
error = %e,
"vz-linux: periodic wall-clock resync failed; retrying next tick"
),
Err(e) => tracing::warn!(
container = %dir_for_settime,
error = %e,
"vz-linux: wall-clock resync task panicked; retrying next tick"
),
}
}
});
{
let mut guard = self.containers.write().await;
if let Some(c) = guard.get_mut(&dir_name) {
c.settime_task = Some(settime);
} else {
settime.abort();
}
}
let delete_on_exit = {
let guard = self.containers.read().await;
guard
.get(&dir_name)
.is_some_and(|c| c.spec.lifecycle.delete_on_exit)
};
if delete_on_exit {
let containers = Arc::clone(&self.containers);
let dir_for_cleanup = dir_name.clone();
let cleanup =
tokio::spawn(
async move { watch_for_auto_remove(containers, dir_for_cleanup).await },
);
let mut guard = self.containers.write().await;
if let Some(c) = guard.get_mut(&dir_name) {
c.cleanup_task = Some(cleanup);
} else {
cleanup.abort();
}
}
self.spawn_port_forwards(&dir_name).await;
Ok(())
}
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let (live_for_signal, outcome, already_terminal) = {
let guard = self.containers.read().await;
let Some(c) = guard.get(&dir_name) else {
return Ok(());
};
let live = c.live.as_ref().map(|l| LiveVm {
queue: l.queue.clone(),
vm: Arc::clone(&l.vm),
});
let terminal = c.outcome.terminal.lock().ok().and_then(|g| g.clone());
(live, Arc::clone(&c.outcome), terminal)
};
if let Some(live) = &live_for_signal {
if already_terminal.is_none() {
let live_sig = LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
};
let signalled = tokio::task::spawn_blocking(move || {
signal_agent(&live_sig, signal_number("SIGTERM"))
})
.await;
match signalled {
Ok(Ok(())) => {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if outcome
.terminal
.lock()
.ok()
.and_then(|g| g.clone())
.is_some()
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Ok(Err(e)) => {
tracing::debug!(
container = %dir_name,
error = %e,
"vz-linux: graceful SIGTERM not delivered; forcing VM stop"
);
}
Err(e) => {
tracing::debug!(
container = %dir_name,
error = %e,
"vz-linux: SIGTERM task panicked; forcing VM stop"
);
}
}
}
}
let (live, drain, cleanup, settime) = {
let mut guard = self.containers.write().await;
let Some(c) = guard.get_mut(&dir_name) else {
return Ok(());
};
let recorded = c.outcome.terminal.lock().ok().and_then(|g| g.clone());
let recorded_code = c.outcome.exit_code.lock().ok().and_then(|g| *g);
c.state = outcome_to_state(recorded_code, recorded, ContainerState::Exited { code: 0 });
for f in std::mem::take(&mut c.port_forwards) {
f.task.abort();
}
(
c.live.take(),
c.drain_task.take(),
c.cleanup_task.take(),
c.settime_task.take(),
)
};
if let Some(settime) = settime {
settime.abort();
}
if let Some(cleanup) = cleanup {
cleanup.abort();
}
if let Some(drain) = drain {
drain.abort();
}
if let Some(live) = live {
let _ = tokio::task::spawn_blocking(move || {
let r = run_vm_lifecycle(&live, VmLifecycleOp::Stop);
drop(live);
r
})
.await;
}
Ok(())
}
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let (state_dir, live, drain, cleanup, settime, forwards) = {
let mut guard = self.containers.write().await;
match guard.remove(&dir_name) {
Some(c) => (
Some(c.state_dir),
c.live,
c.drain_task,
c.cleanup_task,
c.settime_task,
c.port_forwards,
),
None => return Ok(()),
}
};
if let Some(settime) = settime {
settime.abort();
}
if let Some(cleanup) = cleanup {
cleanup.abort();
}
if let Some(drain) = drain {
drain.abort();
}
for f in forwards {
f.task.abort();
}
if let Some(live) = live {
let _ = tokio::task::spawn_blocking(move || {
let r = run_vm_lifecycle(&live, VmLifecycleOp::Stop);
drop(live);
r
})
.await;
}
if let Some(dir) = state_dir {
let _ = tokio::fs::remove_dir_all(dir).await;
}
Ok(())
}
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let dir_name = Self::container_dir_name(id);
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
let captured_code = c.outcome.exit_code.lock().ok().and_then(|g| *g);
let terminal = c.outcome.terminal.lock().ok().and_then(|g| g.clone());
if terminal.is_some() || captured_code.is_some() {
return Ok(outcome_to_state(captured_code, terminal, c.state.clone()));
}
if let Some(live) = &c.live {
let reconciled = match read_vm_state(live) {
VZVirtualMachineState::Running | VZVirtualMachineState::Paused => {
ContainerState::Running
}
VZVirtualMachineState::Error => ContainerState::Failed {
reason: "VM entered error state".to_string(),
},
VZVirtualMachineState::Stopped => match &c.state {
s @ ContainerState::Exited { .. } => s.clone(),
_ => ContainerState::Exited { code: 0 },
},
_ => c.state.clone(),
};
return Ok(reconciled);
}
Ok(c.state.clone())
}
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
self.captured_logs(id, tail).await
}
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
let opts = crate::runtime::ExecOptions {
command: cmd.to_vec(),
..Default::default()
};
self.exec_with_opts(id, &opts).await
}
async fn exec_with_opts(
&self,
id: &ContainerId,
opts: &crate::runtime::ExecOptions,
) -> Result<(i32, String, String)> {
let dir_name = Self::container_dir_name(id);
if opts.command.is_empty() {
return Err(AgentError::Configuration(
"vz-linux: exec requires a non-empty command".to_string(),
));
}
let (live, mut env) = {
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
let live = c.live.as_ref().ok_or_else(|| {
AgentError::Configuration(format!(
"vz-linux: container {dir_name} is not running (no live VM / agent connection)"
))
})?;
if c.outcome
.terminal
.lock()
.ok()
.and_then(|g| g.clone())
.is_some()
{
return Err(AgentError::Configuration(format!(
"vz-linux: container {dir_name} workload has exited; nothing to exec into"
)));
}
let env: Vec<(String, String)> = c
.spec
.env
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
(
LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
},
env,
)
};
for kv in &opts.env {
if let Some((k, v)) = kv.split_once('=') {
if let Some(slot) = env.iter_mut().find(|(ek, _)| ek == k) {
slot.1 = v.to_string();
} else {
env.push((k.to_string(), v.to_string()));
}
}
}
let (uid, gid, user) = parse_exec_user(opts.user.as_deref());
let exec_msg = proto::Msg::Exec {
argv: opts.command.clone(),
env,
cwd: opts.working_dir.clone(),
uid,
gid,
user,
};
let dir_for_task = dir_name.clone();
tokio::task::spawn_blocking(move || exec_and_collect(&live, &exec_msg))
.await
.map_err(|e| {
AgentError::Configuration(format!(
"vz-linux: exec task for {dir_for_task} panicked: {e}"
))
})?
.map_err(|e| AgentError::Configuration(format!("vz-linux: exec failed: {e}")))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let dir_name = Self::container_dir_name(id);
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
let memory_limit = u64::from(c.ram_mib) * 1024 * 1024;
let memory_bytes = (memory_limit / 4).max(1);
Ok(ContainerStats {
cpu_usage_usec: 0,
memory_bytes,
memory_limit,
timestamp: Instant::now(),
})
}
async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
use futures_util::stream;
let dir_name = Self::container_dir_name(id);
let (vcpus, ram_mib) = {
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
(c.vcpus, c.ram_mib)
};
let mem_limit_bytes = u64::from(ram_mib) * 1024 * 1024;
let mem_used_bytes = (mem_limit_bytes / 4).max(1);
let sample = StatsSample {
cpu_total_ns: 0,
cpu_system_ns: 0,
online_cpus: vcpus,
mem_used_bytes,
mem_limit_bytes,
net_rx_bytes: 0,
net_tx_bytes: 0,
blkio_read_bytes: 0,
blkio_write_bytes: 0,
pids_current: 0,
pids_limit: None,
timestamp: chrono::Utc::now(),
};
Ok(Box::pin(stream::iter(vec![Ok(sample)])))
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let dir_name = Self::container_dir_name(id);
loop {
{
let guard = self.containers.read().await;
let Some(c) = guard.get(&dir_name) else {
return Ok(0);
};
if let Some(code) = c.outcome.exit_code.lock().ok().and_then(|g| *g) {
return Ok(code);
}
if let Some(ContainerState::Failed { .. }) =
c.outcome.terminal.lock().ok().and_then(|g| g.clone())
{
return Ok(-1);
}
if let Some(live) = &c.live {
if read_vm_state(live) == VZVirtualMachineState::Stopped {
return Ok(0);
}
} else {
return Ok(0);
}
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
self.captured_logs(id, 1000).await
}
#[allow(clippy::too_many_lines)]
async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
use futures_util::{stream, StreamExt as _};
let want_stdout = opts.stdout || !opts.stderr;
let want_stderr = opts.stderr || !opts.stdout;
let with_ts = opts.timestamps;
let to_chunk = move |e: LogEntry| -> Option<Result<LogChunk>> {
let channel = match e.stream {
LogStream::Stdout => LogChannel::Stdout,
LogStream::Stderr => LogChannel::Stderr,
};
let keep = match channel {
LogChannel::Stdout => want_stdout,
LogChannel::Stderr => want_stderr,
LogChannel::Stdin => false,
};
if !keep {
return None;
}
let mut bytes = e.message.into_bytes();
bytes.push(b'\n');
Some(Ok(LogChunk {
stream: channel,
bytes: bytes::Bytes::from(bytes),
timestamp: with_ts.then_some(e.timestamp),
}))
};
if !opts.follow {
let tail = opts
.tail
.map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
let entries = self.captured_logs(id, tail).await?;
let chunks: Vec<Result<LogChunk>> = entries.into_iter().filter_map(to_chunk).collect();
return Ok(Box::pin(stream::iter(chunks)));
}
let dir_name = Self::container_dir_name(id);
let workload_seen = {
let guard = self.containers.read().await;
guard
.get(&dir_name)
.and_then(|c| c.outcome.logs.lock().ok().map(|b| b.len()))
.unwrap_or(0)
};
let full = self.captured_logs(id, usize::MAX).await?;
let total = full.len();
let tail = opts
.tail
.map_or(total, |n| usize::try_from(n).unwrap_or(total));
let seed_start = total.saturating_sub(tail);
let initial: Vec<Result<LogChunk>> = full
.into_iter()
.skip(seed_start)
.filter_map(to_chunk)
.collect();
let containers = Arc::clone(&self.containers);
let seed_state = LogFollowState {
containers,
dir_name,
next: workload_seen,
done: false,
};
let follow = stream::unfold(seed_state, move |mut st| {
async move {
loop {
if st.done {
return None;
}
let (entries, terminal): (Vec<LogEntry>, bool) = {
let guard = st.containers.read().await;
match guard.get(&st.dir_name) {
Some(c) => {
let entries =
c.outcome.logs.lock().map(|b| b.clone()).unwrap_or_default();
let terminal = c
.outcome
.terminal
.lock()
.ok()
.and_then(|t| t.clone())
.is_some()
|| c.outcome.exit_code.lock().ok().and_then(|g| *g).is_some();
(entries, terminal)
}
None => return None,
}
};
if st.next < entries.len() {
let entry = entries[st.next].clone();
st.next += 1;
if let Some(chunk) = to_chunk(entry) {
return Some((chunk, st));
}
continue;
}
if terminal {
st.done = true;
return None;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});
Ok(Box::pin(stream::iter(initial).chain(follow)))
}
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let dir_name = Self::container_dir_name(id);
let guard = self.containers.read().await;
match guard.get(&dir_name) {
Some(c) if c.live.is_some() => Ok(Some(Self::pseudo_pid(id))),
_ => Ok(None),
}
}
fn overlay_attach_kind(&self) -> OverlayAttachKind {
OverlayAttachKind::InGuestVsock
}
async fn push_overlay_config(
&self,
id: &ContainerId,
config: &zlayer_types::overlayd::GuestOverlayConfig,
) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let (live, mac) = {
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
let live = c.live.as_ref().map(|l| LiveVm {
queue: l.queue.clone(),
vm: Arc::clone(&l.vm),
});
(live, c.mac.clone())
};
let Some(live) = live else {
return Err(AgentError::Network(format!(
"cannot push overlay config to {dir_name}: VM is not live"
)));
};
let gateway = macos_vz_shared::current_guest_ip(&mac)
.await
.map_or_else(|| "192.168.64.1".to_string(), nat_gateway_for);
let msg = proto::Msg::OverlayConfig {
overlay_ip: config.overlay_ip.to_string(),
prefix_len: config.prefix_len,
private_key: config.private_key.clone(),
listen_port: config.listen_port,
peers: config
.peers
.iter()
.map(|p| proto::WgPeer {
public_key: p.public_key.clone(),
endpoint: rewrite_endpoint_host(&p.endpoint, &gateway),
allowed_ips: p.allowed_ips.clone(),
persistent_keepalive_secs: p.persistent_keepalive_secs,
})
.collect(),
dns_server: config.dns_server.map(|ip| ip.to_string()),
dns_domain: config.dns_domain.clone(),
};
let saw_eof = tokio::task::spawn_blocking(move || push_overlay_agent(&live, &msg))
.await
.map_err(|e| AgentError::Network(format!("overlay push task join failed: {e}")))?
.map_err(AgentError::Network)?;
tracing::info!(
container = %dir_name,
guest_acked = saw_eof,
"pushed OverlayConfig to guest over vsock"
);
if let Some(c) = self.containers.write().await.get_mut(&dir_name) {
c.overlay_ip = Some(config.overlay_ip);
}
Ok(())
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
use std::net::Ipv4Addr;
let dir_name = Self::container_dir_name(id);
let mac = {
let guard = self.containers.read().await;
match guard.get(&dir_name) {
Some(c) if c.live.is_some() => {
if let Some(ip) = c.overlay_ip {
return Ok(Some(ip));
}
c.mac.clone()
}
_ => return Ok(None),
}
};
if let Some(ip) = macos_vz_shared::current_guest_ip(&mac).await {
return Ok(Some(ip));
}
Ok(Some(IpAddr::V4(Ipv4Addr::LOCALHOST)))
}
async fn port_mappings_container(
&self,
id: &ContainerId,
) -> Result<Vec<crate::runtime::PortMappingEntry>> {
let dir_name = Self::container_dir_name(id);
let forwarded: Vec<(u16, zlayer_spec::PortProtocol)> = {
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
c.port_forwards
.iter()
.map(|f| (f.container_port, f.protocol))
.collect()
};
let out = forwarded
.into_iter()
.map(
|(container_port, protocol)| crate::runtime::PortMappingEntry {
container_port,
protocol: protocol.as_str().to_string(),
host_ip: Some("127.0.0.1".to_string()),
host_port: Some(container_port),
},
)
.collect();
Ok(out)
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
let signum = signal_number(&canonical);
let (live, running) = {
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
let live = c.live.as_ref().map(|l| LiveVm {
queue: l.queue.clone(),
vm: Arc::clone(&l.vm),
});
let running = c
.outcome
.terminal
.lock()
.ok()
.and_then(|g| g.clone())
.is_none();
(live, running)
};
match live {
Some(live) if running => {
let deliver =
tokio::task::spawn_blocking(move || signal_agent(&live, signum)).await;
match deliver {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::debug!(
container = %dir_name,
error = %e,
"vz-linux: kill signal not delivered; forcing VM stop"
),
Err(e) => tracing::debug!(
container = %dir_name,
error = %e,
"vz-linux: kill signal task panicked; forcing VM stop"
),
}
}
_ => {}
}
self.stop_container(id, Duration::from_secs(0)).await
}
async fn write_stdin(&self, id: &ContainerId, data: &[u8]) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let guard = self.containers.read().await;
let c = guard.get(&dir_name).ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
let tx = c.stdin_tx.as_ref().ok_or_else(|| {
AgentError::Internal(format!("stdin is closed for container {dir_name}"))
})?;
tx.send(data.to_vec()).map_err(|_| {
AgentError::Internal(format!(
"stdin receiver gone for container {dir_name} (workload exited?)"
))
})?;
Ok(())
}
async fn close_stdin(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let mut guard = self.containers.write().await;
let c = guard
.get_mut(&dir_name)
.ok_or_else(|| AgentError::NotFound {
container: dir_name.clone(),
reason: "not found".to_string(),
})?;
c.stdin_tx = None;
Ok(())
}
async fn list_images(&self) -> Result<Vec<crate::runtime::ImageInfo>> {
let images = self.image_rootfs.read().await;
Ok(images
.keys()
.map(|reference| crate::runtime::ImageInfo {
reference: reference.clone(),
digest: None,
size_bytes: None,
})
.collect())
}
async fn pause_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let live = {
let guard = self.containers.read().await;
guard.get(&dir_name).and_then(|c| {
c.live.as_ref().map(|l| LiveVm {
queue: l.queue.clone(),
vm: Arc::clone(&l.vm),
})
})
};
let Some(live) = live else {
return Err(AgentError::NotFound {
container: dir_name,
reason: "not running".to_string(),
});
};
tokio::task::spawn_blocking(move || run_vm_lifecycle(&live, VmLifecycleOp::Pause))
.await
.map_err(|e| AgentError::Internal(format!("pause task: {e}")))?
.map_err(|e| AgentError::Internal(format!("pause: {e}")))
}
async fn unpause_container(&self, id: &ContainerId) -> Result<()> {
let dir_name = Self::container_dir_name(id);
let live = {
let guard = self.containers.read().await;
guard.get(&dir_name).and_then(|c| {
c.live.as_ref().map(|l| LiveVm {
queue: l.queue.clone(),
vm: Arc::clone(&l.vm),
})
})
};
let Some(live) = live else {
return Err(AgentError::NotFound {
container: dir_name,
reason: "not running".to_string(),
});
};
let live_for_settime = LiveVm {
queue: live.queue.clone(),
vm: Arc::clone(&live.vm),
};
tokio::task::spawn_blocking(move || run_vm_lifecycle(&live, VmLifecycleOp::Resume))
.await
.map_err(|e| AgentError::Internal(format!("resume task: {e}")))?
.map_err(|e| AgentError::Internal(format!("resume: {e}")))?;
match tokio::task::spawn_blocking(move || push_settime(&live_for_settime)).await {
Ok(Ok(acked)) => tracing::debug!(
container = %dir_name,
guest_acked = acked,
"vz-linux: pushed wall-clock resync to guest after resume"
),
Ok(Err(e)) => tracing::warn!(
container = %dir_name,
error = %e,
"vz-linux: post-resume wall-clock resync failed; periodic task will retry"
),
Err(e) => tracing::warn!(
container = %dir_name,
error = %e,
"vz-linux: post-resume wall-clock resync task panicked; periodic task will retry"
),
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sanitize_image_name_replaces_separators() {
assert_eq!(
sanitize_image_name("docker.io/library/alpine:3.19"),
"docker.io_library_alpine_3.19"
);
assert_eq!(
sanitize_image_name("ghcr.io/o/r@sha256:abc"),
"ghcr.io_o_r_sha256_abc"
);
}
#[test]
fn sanitize_image_name_is_literal_no_docker_hub_injection() {
assert_eq!(sanitize_image_name("alpine"), "alpine");
assert_eq!(sanitize_image_name("alpine:latest"), "alpine_latest");
assert_eq!(
sanitize_image_name("library/alpine:latest"),
"library_alpine_latest"
);
assert_eq!(
sanitize_image_name("docker.io/library/alpine:latest"),
"docker.io_library_alpine_latest"
);
assert_ne!(
sanitize_image_name("alpine:latest"),
sanitize_image_name("docker.io/library/alpine:latest"),
);
}
#[test]
fn service_spec_shorthand_keys_literal_image_dir() {
let spec = ServiceSpec::minimal("svc", "alpine:latest");
assert_eq!(
sanitize_image_name(&spec.image.name.to_string()),
"alpine_latest",
);
}
#[test]
fn resolve_existing_rootfs_finds_cross_spelling() {
let tmp = std::env::temp_dir().join(format!(
"zlayer-vz-resolve-test-{}-{}",
std::process::id(),
"alpine"
));
let _ = std::fs::remove_dir_all(&tmp);
let rt = VzLinuxRuntime {
data_dir: tmp.clone(),
log_dir: tmp.join("logs"),
containers: Arc::new(RwLock::new(HashMap::new())),
image_rootfs: Arc::new(RwLock::new(HashMap::new())),
};
let bare_rootfs = rt.images_dir().join("alpine_latest").join("rootfs");
std::fs::create_dir_all(bare_rootfs.join("bin")).unwrap();
std::fs::write(bare_rootfs.join("bin").join("sh"), b"#!/bin/sh\n").unwrap();
assert_eq!(
rt.resolve_existing_rootfs("alpine:latest"),
Some(bare_rootfs.clone())
);
assert_eq!(
rt.resolve_existing_rootfs("docker.io/library/alpine:latest"),
Some(bare_rootfs)
);
assert_eq!(rt.resolve_existing_rootfs("nginx:latest"), None);
let _ = std::fs::remove_dir_all(&tmp);
}
#[test]
fn rootfs_is_populated_distinguishes_empty_from_nonempty() {
let tmp =
std::env::temp_dir().join(format!("zlayer-vzlinux-rootfs-test-{}", std::process::id()));
let empty = tmp.join("empty");
let full = tmp.join("full");
std::fs::create_dir_all(&empty).unwrap();
std::fs::create_dir_all(full.join("bin")).unwrap();
std::fs::write(full.join("bin").join("sh"), b"#!/bin/sh\n").unwrap();
assert!(!rootfs_is_populated(&tmp.join("does-not-exist")));
assert!(!rootfs_is_populated(&empty));
assert!(rootfs_is_populated(&full));
let _ = std::fs::remove_dir_all(&tmp);
}
#[test]
fn spec_defaults_are_sane() {
let spec = ServiceSpec::minimal("svc", "docker.io/library/alpine:3.19");
assert_eq!(spec_vcpus(&spec, 2), 2);
assert!(spec_memory_mib(&spec, 512, 128) >= 128);
}
#[test]
fn linux_cmdline_routes_console_and_marks_virtiofs_root() {
assert!(LINUX_CMDLINE.contains("console=hvc0"));
assert!(LINUX_CMDLINE.contains("rootfstag=rootfs"));
assert!(!LINUX_CMDLINE.contains("root=/dev/vda"));
}
#[test]
fn linux_cmdline_injects_recent_boottime() {
let now = std::time::SystemTime::now();
let now_secs = now
.duration_since(std::time::UNIX_EPOCH)
.expect("host clock is after UNIX_EPOCH")
.as_secs();
let cmdline = build_linux_cmdline(now);
assert!(
cmdline.contains("console=hvc0"),
"cmdline lost its static base: {cmdline}"
);
let token = cmdline
.split_whitespace()
.find_map(|t| t.strip_prefix("zlayer.boottime="))
.unwrap_or_else(|| panic!("cmdline missing zlayer.boottime=: {cmdline}"));
let boottime: u64 = token
.parse()
.unwrap_or_else(|e| panic!("zlayer.boottime not an integer ({token:?}): {e}"));
let drift = boottime.abs_diff(now_secs);
assert!(
drift <= 300,
"boottime {boottime} drifted {drift}s from now {now_secs} (>300s)"
);
}
#[test]
fn linux_cmdline_falls_back_when_clock_pre_epoch() {
let pre_epoch = std::time::UNIX_EPOCH - Duration::from_secs(1);
let cmdline = build_linux_cmdline(pre_epoch);
assert_eq!(cmdline, LINUX_CMDLINE);
assert!(!cmdline.contains("zlayer.boottime="));
}
#[test]
fn not_yet_sentinel_is_configuration_error() {
let err = not_yet("exec");
match err {
AgentError::Configuration(msg) => {
assert!(msg.contains("vz-linux"));
assert!(msg.contains("exec"));
assert!(msg.contains("later phase"));
}
other => panic!("expected Configuration sentinel, got {other:?}"),
}
}
#[test]
fn parse_user_handles_uid_gid_forms() {
use macos_vz_shared::parse_user;
assert_eq!(parse_user(None), (0, 0));
assert_eq!(parse_user(Some("")), (0, 0));
assert_eq!(parse_user(Some("1000")), (1000, 1000));
assert_eq!(parse_user(Some("1000:2000")), (1000, 2000));
assert_eq!(parse_user(Some("alice")), (0, 0));
assert_eq!(parse_user(Some("alice:staff")), (0, 0));
}
#[test]
fn parse_exec_user_defers_names_to_guest() {
assert_eq!(parse_exec_user(None), (None, None, None));
assert_eq!(parse_exec_user(Some("")), (None, None, None));
assert_eq!(parse_exec_user(Some(" ")), (None, None, None));
assert_eq!(parse_exec_user(Some("1000")), (Some(1000), None, None));
assert_eq!(
parse_exec_user(Some("1000:2000")),
(Some(1000), Some(2000), None)
);
assert_eq!(
parse_exec_user(Some("git")),
(None, None, Some("git".to_string()))
);
assert_eq!(
parse_exec_user(Some("git:git")),
(None, None, Some("git:git".to_string()))
);
assert_eq!(
parse_exec_user(Some("1000:staff")),
(Some(1000), None, Some("1000:staff".to_string()))
);
}
#[test]
fn signal_number_maps_canonical_names_and_defaults_to_sigkill() {
assert_eq!(signal_number("SIGHUP"), 1);
assert_eq!(signal_number("SIGINT"), 2);
assert_eq!(signal_number("SIGKILL"), 9);
assert_eq!(signal_number("SIGUSR1"), 10);
assert_eq!(signal_number("SIGUSR2"), 12);
assert_eq!(signal_number("SIGTERM"), 15);
assert_eq!(signal_number(" sigterm "), 15);
assert_eq!(signal_number("SIGWINCH"), 9);
assert_eq!(signal_number(""), 9);
}
#[test]
fn signal_number_roundtrips_validate_signal_output() {
for s in [
"SIGKILL", "SIGTERM", "SIGINT", "SIGHUP", "SIGUSR1", "SIGUSR2",
] {
let canonical = crate::runtime::validate_signal(s).expect("known signal");
let n = signal_number(&canonical);
assert!(n > 0, "{s} mapped to non-positive {n}");
}
}
#[test]
fn build_run_message_carries_entrypoint_env_and_user() {
let mut spec = ServiceSpec::minimal("svc", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec!["echo".to_string()]);
spec.command.args = Some(vec!["hello".to_string()]);
spec.command.workdir = Some("/app".to_string());
spec.env.insert("FOO".to_string(), "bar".to_string());
spec.user = Some("1000:1000".to_string());
match build_run_message(&spec, None) {
proto::Msg::Run {
argv,
env,
cwd,
uid,
gid,
} => {
assert_eq!(argv, vec!["echo".to_string(), "hello".to_string()]);
assert_eq!(cwd, Some("/app".to_string()));
assert_eq!(uid, 1000);
assert_eq!(gid, 1000);
assert!(env.contains(&("FOO".to_string(), "bar".to_string())));
}
other => panic!("expected Msg::Run, got {other:?}"),
}
}
#[test]
fn derive_bind_mounts_maps_only_binds_with_sequential_tags() {
let mut spec = ServiceSpec::minimal("svc", "docker.io/library/alpine:latest");
spec.storage = vec![
zlayer_spec::StorageSpec::Bind {
source: "/host/a".to_string(),
target: "/work".to_string(),
readonly: false,
},
zlayer_spec::StorageSpec::Tmpfs {
target: "/tmp".to_string(),
size: None,
mode: None,
},
zlayer_spec::StorageSpec::Bind {
source: "/host/b".to_string(),
target: "/data".to_string(),
readonly: true,
},
];
let binds = derive_bind_mounts(&spec);
assert_eq!(binds.len(), 2, "only the two Bind entries map to shares");
assert_eq!(binds[0].host, PathBuf::from("/host/a"));
assert_eq!(binds[0].tag, "zlmnt0");
assert_eq!(binds[0].target, "/work");
assert!(!binds[0].readonly);
assert_eq!(binds[1].host, PathBuf::from("/host/b"));
assert_eq!(binds[1].tag, "zlmnt1");
assert_eq!(binds[1].target, "/data");
assert!(binds[1].readonly);
}
#[test]
fn derive_bind_mounts_empty_when_no_binds() {
let spec = ServiceSpec::minimal("svc", "docker.io/library/alpine:latest");
assert!(derive_bind_mounts(&spec).is_empty());
}
#[test]
fn capture_chunk_records_stream_into_outcome_and_console() {
let outcome = RunOutcome::default();
let tmp = std::env::temp_dir().join(format!(
"vzl-capture-{}-{}.log",
std::process::id(),
chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default()
));
let _ = std::fs::remove_file(&tmp);
capture_chunk(&outcome, &tmp, "svc-0", LogStream::Stdout, b"HELLO\n");
capture_chunk(&outcome, &tmp, "svc-0", LogStream::Stderr, b"oops\n");
let logs = outcome.logs.lock().expect("logs lock");
assert_eq!(logs.len(), 2, "both chunks must be captured");
assert_eq!(logs[0].stream, LogStream::Stdout);
assert!(logs[0].message.contains("HELLO"));
assert_eq!(logs[1].stream, LogStream::Stderr);
assert!(logs[1].message.contains("oops"));
let on_disk = std::fs::read_to_string(&tmp).unwrap_or_default();
assert!(
on_disk.contains("HELLO") && on_disk.contains("oops"),
"captured output must also be mirrored to console.log; got {on_disk:?}"
);
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn captured_exit_code_persists_into_container_state() {
let outcome = RunOutcome::default();
record_exit(&outcome, 42);
assert_eq!(
outcome.exit_code.lock().expect("exit_code lock").as_ref(),
Some(&42),
"wait path: captured exit code must be the real workload code"
);
let terminal = outcome.terminal.lock().expect("terminal lock").clone();
let captured = outcome
.exit_code
.lock()
.expect("exit_code lock")
.as_ref()
.copied();
let state = outcome_to_state(captured, terminal, ContainerState::Running);
assert_eq!(
state,
ContainerState::Exited { code: 42 },
"state path must surface the captured non-zero exit code, not 0"
);
let outcome_ok = RunOutcome::default();
record_exit(&outcome_ok, 0);
let terminal_ok = outcome_ok.terminal.lock().expect("terminal lock").clone();
let captured_ok = outcome_ok
.exit_code
.lock()
.expect("exit_code lock")
.as_ref()
.copied();
assert_eq!(
outcome_to_state(captured_ok, terminal_ok, ContainerState::Running),
ContainerState::Exited { code: 0 },
"exit 0 must map to Exited {{ code: 0 }}"
);
let outcome_sig = RunOutcome::default();
record_exit(&outcome_sig, 137);
let terminal_sig = outcome_sig.terminal.lock().expect("terminal lock").clone();
let captured_sig = outcome_sig
.exit_code
.lock()
.expect("exit_code lock")
.as_ref()
.copied();
assert_eq!(
outcome_to_state(captured_sig, terminal_sig, ContainerState::Running),
ContainerState::Exited { code: 137 },
"SIGKILL death must surface 137 on the state path"
);
}
#[test]
fn outcome_to_state_prefers_captured_outcome_over_fallback() {
assert_eq!(
outcome_to_state(Some(42), None, ContainerState::Running),
ContainerState::Exited { code: 42 },
);
assert_eq!(
outcome_to_state(Some(7), None, ContainerState::Exited { code: 0 }),
ContainerState::Exited { code: 7 },
"a captured code must override a forced-stop `Exited {{ code: 0 }}` fallback"
);
let failed = ContainerState::Failed {
reason: "agent error".to_string(),
};
assert_eq!(
outcome_to_state(Some(0), Some(failed.clone()), ContainerState::Running),
failed,
);
assert_eq!(
outcome_to_state(None, None, ContainerState::Running),
ContainerState::Running,
);
}
#[test]
fn build_run_message_defaults_to_root_and_true() {
let spec = ServiceSpec::minimal("svc", "docker.io/library/alpine:latest");
match build_run_message(&spec, None) {
proto::Msg::Run { argv, uid, gid, .. } => {
assert_eq!(argv, vec!["true".to_string()]);
assert_eq!((uid, gid), (0, 0));
}
other => panic!("expected Msg::Run, got {other:?}"),
}
}
#[test]
fn build_run_message_applies_image_entrypoint_and_env() {
let spec = ServiceSpec::minimal("svc", "docker.io/library/postgres:16-alpine");
let image = zlayer_registry::ImageConfig {
entrypoint: Some(vec!["docker-entrypoint.sh".to_string()]),
cmd: Some(vec!["postgres".to_string()]),
env: Some(vec!["PATH=/usr/local/bin:/usr/bin".to_string()]),
working_dir: Some("/var/lib/postgresql".to_string()),
user: Some("70:70".to_string()),
..Default::default()
};
match build_run_message(&spec, Some(&image)) {
proto::Msg::Run {
argv,
env,
cwd,
uid,
gid,
} => {
assert_eq!(
argv,
vec!["docker-entrypoint.sh".to_string(), "postgres".to_string()]
);
assert!(env.contains(&("PATH".to_string(), "/usr/local/bin:/usr/bin".to_string())));
assert_eq!(cwd, Some("/var/lib/postgresql".to_string()));
assert_eq!((uid, gid), (70, 70));
}
other => panic!("expected Msg::Run, got {other:?}"),
}
}
#[test]
fn virtiofs_tag_matches_cmdline_rootfstag() {
assert!(LINUX_CMDLINE.contains(&format!("rootfstag={VIRTIOFS_ROOTFS_TAG}")));
}
#[tokio::test]
async fn ensure_linux_kernel_errors_clearly_when_absent() {
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
if std::env::var_os("ZLAYER_VZ_LINUX_KERNEL").is_some()
&& std::env::var_os("ZLAYER_VZ_LINUX_INITRD").is_some()
{
return;
}
let cache = rt.kernel_cache_dir();
if cache.join("Image").exists() && cache.join("initramfs.cpio.gz").exists() {
return;
}
match rt.ensure_linux_kernel().await {
Err(AgentError::Configuration(msg)) => {
assert!(msg.contains("ZLAYER_VZ_LINUX_KERNEL/_INITRD"));
assert!(msg.contains("Image+initramfs.cpio.gz"));
assert!(msg.contains(VZ_LINUX_BUNDLE_IMAGE));
assert!(msg.contains("must be public"));
}
Ok(_) => {}
other => panic!("expected Configuration error or Ok, got {other:?}"),
}
}
#[test]
fn kernel_bundle_needs_extract_decision_matrix() {
assert!(kernel_bundle_needs_extract(
Some("sha256:a"),
Some("sha256:a"),
false,
true
));
assert!(kernel_bundle_needs_extract(
Some("sha256:a"),
Some("sha256:a"),
true,
false
));
assert!(kernel_bundle_needs_extract(None, None, false, false));
assert!(!kernel_bundle_needs_extract(
Some("sha256:a"),
Some("sha256:a"),
true,
true
));
assert!(kernel_bundle_needs_extract(
Some("sha256:b"),
Some("sha256:a"),
true,
true
));
assert!(kernel_bundle_needs_extract(
Some("sha256:b"),
None,
true,
true
));
assert!(!kernel_bundle_needs_extract(
None,
Some("sha256:a"),
true,
true
));
assert!(!kernel_bundle_needs_extract(None, None, true, true));
}
#[test]
fn mac_roundtrip_preserves_address_and_rejects_garbage() {
let original = unsafe {
VZMACAddress::randomLocallyAdministeredAddress()
.string()
.to_string()
};
let round = mac_roundtrip(&original).expect("valid MAC must round-trip");
assert_eq!(
macos_vz_shared::normalize_mac_for_test(&round),
macos_vz_shared::normalize_mac_for_test(&original),
"round-tripped MAC must match the original (modulo case/padding)"
);
let fixed = mac_roundtrip("0a:1b:2c:3d:4e:5f").expect("fixed MAC must parse");
assert_eq!(
macos_vz_shared::normalize_mac_for_test(&fixed),
"a:1b:2c:3d:4e:5f"
);
assert!(mac_roundtrip("not-a-mac").is_none());
assert!(mac_roundtrip("").is_none());
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + a guest kernel (ZLAYER_VZ_LINUX_KERNEL/_INITRD)"]
async fn vz_linux_boots_to_console() {
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxtest", 1);
let spec = ServiceSpec::minimal("vzlinuxtest", "docker.io/library/alpine:3.19");
rt.pull_image("docker.io/library/alpine:3.19")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
rt.start_container(&id).await.expect("start");
let state = rt.container_state(&id).await.expect("state");
assert_eq!(state, ContainerState::Running, "VM should be Running");
let mut wrote = false;
for _ in 0..60 {
let logs = rt.container_logs(&id, 200).await.unwrap_or_default();
if logs.iter().any(|l| !l.message.trim().is_empty()) {
wrote = true;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
assert!(wrote, "guest never wrote to the serial console");
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
rt.remove_container(&id).await.expect("remove");
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + vzagent kernel/initrd + alpine pull"]
async fn vz_linux_run_echo() {
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxecho", 1);
let mut spec = ServiceSpec::minimal("vzlinuxecho", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec!["echo".to_string(), "hello".to_string()]);
rt.pull_image("docker.io/library/alpine:latest")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
rt.start_container(&id).await.expect("start");
let code = tokio::time::timeout(Duration::from_secs(120), rt.wait_container(&id))
.await
.expect("wait_container timed out")
.expect("wait_container");
assert_eq!(code, 0, "echo should exit 0");
let logs = rt.get_logs(&id).await.expect("logs");
assert!(
logs.iter().any(|l| l.message.contains("hello")),
"captured logs should contain 'hello', got: {:?}",
logs.iter().map(|l| &l.message).collect::<Vec<_>>()
);
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
rt.remove_container(&id).await.expect("remove");
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + vzagent kernel/initrd + alpine pull"]
async fn vz_linux_run_exit_code() {
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxexit", 1);
let mut spec = ServiceSpec::minimal("vzlinuxexit", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec![
"sh".to_string(),
"-c".to_string(),
"exit 7".to_string(),
]);
rt.pull_image("docker.io/library/alpine:latest")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
rt.start_container(&id).await.expect("start");
let code = tokio::time::timeout(Duration::from_secs(120), rt.wait_container(&id))
.await
.expect("wait_container timed out")
.expect("wait_container");
assert_eq!(code, 7, "sh -c 'exit 7' should propagate exit code 7");
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
rt.remove_container(&id).await.expect("remove");
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + vzagent kernel/initrd + alpine pull"]
async fn vz_linux_exec_echo() {
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxexec", 1);
let mut spec = ServiceSpec::minimal("vzlinuxexec", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec!["sleep".to_string(), "600".to_string()]);
rt.pull_image("docker.io/library/alpine:latest")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
rt.start_container(&id).await.expect("start");
tokio::time::sleep(Duration::from_secs(15)).await;
assert_eq!(
rt.container_state(&id).await.expect("state"),
ContainerState::Running,
"container should be running before exec"
);
let (code, stdout, stderr) = tokio::time::timeout(
Duration::from_secs(60),
rt.exec(&id, &["echo".to_string(), "hi".to_string()]),
)
.await
.expect("exec timed out")
.expect("exec");
assert_eq!(code, 0, "echo should exit 0");
assert!(
stdout.contains("hi"),
"exec stdout should contain 'hi', got: {stdout:?}"
);
assert_eq!(stderr, "", "exec stderr should be empty, got: {stderr:?}");
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
rt.remove_container(&id).await.expect("remove");
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + vzagent kernel/initrd + alpine pull"]
async fn vz_linux_exec_exit_code() {
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxexeccode", 1);
let mut spec = ServiceSpec::minimal("vzlinuxexeccode", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec!["sleep".to_string(), "600".to_string()]);
rt.pull_image("docker.io/library/alpine:latest")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
rt.start_container(&id).await.expect("start");
tokio::time::sleep(Duration::from_secs(15)).await;
assert_eq!(
rt.container_state(&id).await.expect("state"),
ContainerState::Running,
"container should be running before exec"
);
let (code, _stdout, _stderr) = tokio::time::timeout(
Duration::from_secs(60),
rt.exec(
&id,
&["sh".to_string(), "-c".to_string(), "exit 42".to_string()],
),
)
.await
.expect("exec timed out")
.expect("exec");
assert_eq!(code, 42, "sh -c 'exit 42' should propagate exit code 42");
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
rt.remove_container(&id).await.expect("remove");
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + vzagent kernel/initrd + alpine pull"]
async fn vz_linux_guest_gets_ip_and_port_reachable() {
use std::io::Read as _;
use std::net::{IpAddr as StdIpAddr, Ipv4Addr, TcpStream};
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxnet", 1);
let mut spec = ServiceSpec::minimal("vzlinuxnet", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec![
"sh".to_string(),
"-c".to_string(),
"while true; do printf 'HTTP/1.1 200 OK\\r\\n\\r\\nhi' | nc -l -p 8080; done"
.to_string(),
]);
spec.port_mappings = vec![zlayer_spec::PortMapping {
host_port: Some(8080),
container_port: 8080,
protocol: zlayer_spec::PortProtocol::Tcp,
host_ip: "127.0.0.1".to_string(),
}];
rt.pull_image("docker.io/library/alpine:latest")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
rt.start_container(&id).await.expect("start");
let mut ip = None;
for _ in 0..60 {
if let Ok(Some(addr)) = rt.get_container_ip(&id).await {
ip = Some(addr);
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
let ip = ip.expect("get_container_ip never returned an address");
assert_eq!(
ip,
StdIpAddr::V4(Ipv4Addr::LOCALHOST),
"vz-linux reachability is via host loopback, not a guest NAT IP"
);
let maps = rt.port_mappings_container(&id).await.expect("port maps");
assert!(
maps.iter().any(|m| m.container_port == 8080
&& m.host_port == Some(8080)
&& m.host_ip.as_deref() == Some("127.0.0.1")),
"port_mappings_container should report 127.0.0.1:8080, got: {maps:?}"
);
let mut ok = false;
for _ in 0..30 {
let connect = tokio::task::spawn_blocking(move || {
let target = std::net::SocketAddr::new(StdIpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
let mut s = TcpStream::connect_timeout(&target, Duration::from_secs(2)).ok()?;
s.set_read_timeout(Some(Duration::from_secs(2))).ok();
let mut buf = [0u8; 64];
let n = s.read(&mut buf).ok()?;
Some(String::from_utf8_lossy(&buf[..n]).into_owned())
})
.await
.expect("connect task");
if let Some(resp) = connect {
if resp.contains("200") || resp.contains("hi") {
ok = true;
break;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
assert!(
ok,
"127.0.0.1:8080 (vsock forwarder) was not reachable / did not serve the banner"
);
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
rt.remove_container(&id).await.expect("remove");
}
#[tokio::test]
#[ignore = "needs the virtualization entitlement + vzagent kernel/initrd + alpine pull"]
async fn vz_linux_full_lifecycle() {
let (Some(_kernel), Some(_initrd)) = (
std::env::var_os("ZLAYER_VZ_LINUX_KERNEL"),
std::env::var_os("ZLAYER_VZ_LINUX_INITRD"),
) else {
eprintln!("ZLAYER_VZ_LINUX_KERNEL/_INITRD unset — skipping");
return;
};
assert!(
unsafe { VZVirtualMachine::isSupported() },
"Virtualization.framework not supported (missing entitlement?)"
);
let rt = VzLinuxRuntime::new(None).expect("construct VzLinuxRuntime");
let id = ContainerId::new("vzlinuxlifecycle", 1);
let mut spec = ServiceSpec::minimal("vzlinuxlifecycle", "docker.io/library/alpine:latest");
spec.command.entrypoint = Some(vec![
"sh".to_string(),
"-c".to_string(),
"echo lifecycle-up; sleep 600".to_string(),
]);
rt.pull_image("docker.io/library/alpine:latest")
.await
.expect("pull alpine");
rt.create_container(&id, &spec).await.expect("create");
assert_eq!(
rt.container_state(&id).await.expect("state after create"),
ContainerState::Pending,
"state should be Pending before start"
);
rt.start_container(&id).await.expect("start");
assert_eq!(
rt.container_state(&id).await.expect("state after start"),
ContainerState::Running,
"state should be Running after start"
);
let mut saw_banner = false;
for _ in 0..60 {
let logs = rt.container_logs(&id, 200).await.unwrap_or_default();
if logs.iter().any(|l| l.message.contains("lifecycle-up")) {
saw_banner = true;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
assert!(saw_banner, "primary banner never appeared in logs");
let stats = rt.get_container_stats(&id).await.expect("stats");
assert!(stats.memory_bytes > 0, "memory_bytes must be > 0");
assert!(stats.memory_limit > 0, "memory_limit must be > 0");
let (code, stdout, _stderr) = tokio::time::timeout(
Duration::from_secs(60),
rt.exec(&id, &["echo".to_string(), "hi".to_string()]),
)
.await
.expect("exec timed out")
.expect("exec");
assert_eq!(code, 0, "exec echo should exit 0");
assert!(stdout.contains("hi"), "exec stdout should contain 'hi'");
rt.stop_container(&id, Duration::from_secs(10))
.await
.expect("stop");
let stopped = rt.container_state(&id).await.expect("state after stop");
assert!(
matches!(stopped, ContainerState::Exited { .. }),
"state after stop should be Exited, got: {stopped:?}"
);
rt.remove_container(&id).await.expect("remove");
assert!(
rt.container_state(&id).await.is_err(),
"container_state must error after remove (record gone)"
);
rt.remove_container(&id)
.await
.expect("second remove is Ok (idempotent)");
}
}