use std::collections::VecDeque;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::assets::AssetPaths;
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
use crate::vmm::pool::{PoolClientError, WarmPool, WarmPoolError};
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
use crate::vmm::resources::VmResources;
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
use crate::vmm::runner::RunOptions;
#[non_exhaustive]
pub enum Error {
Image {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Vm {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Assets {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Io(std::io::Error),
Network {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
CacheMiss { msg: String },
CacheInvalid { msg: String },
Bake {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
ImageNotFound {
image: String,
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
RegistryAuth {
image: String,
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
RegistryUnreachable {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
PoolExhausted { msg: String },
KernelPanic {
first_line: String,
stack: Vec<String>,
},
}
impl Error {
pub(crate) fn image_msg(msg: impl Into<String>) -> Self {
Error::Image {
msg: msg.into(),
source: None,
}
}
pub(crate) fn vm_msg(msg: impl Into<String>) -> Self {
Error::Vm {
msg: msg.into(),
source: None,
}
}
pub(crate) fn assets_msg(msg: impl Into<String>) -> Self {
Error::Assets {
msg: msg.into(),
source: None,
}
}
pub(crate) fn network_msg(msg: impl Into<String>) -> Self {
Error::Network {
msg: msg.into(),
source: None,
}
}
pub(crate) fn bake_msg(msg: impl Into<String>) -> Self {
Error::Bake {
msg: msg.into(),
source: None,
}
}
pub(crate) fn cache_miss(msg: impl Into<String>) -> Self {
Error::CacheMiss { msg: msg.into() }
}
pub(crate) fn cache_invalid(msg: impl Into<String>) -> Self {
Error::CacheInvalid { msg: msg.into() }
}
pub(crate) fn image_not_found(image: impl Into<String>, msg: impl Into<String>) -> Self {
Error::ImageNotFound {
image: image.into(),
msg: msg.into(),
source: None,
}
}
pub(crate) fn registry_auth(image: impl Into<String>, msg: impl Into<String>) -> Self {
Error::RegistryAuth {
image: image.into(),
msg: msg.into(),
source: None,
}
}
pub(crate) fn registry_unreachable(msg: impl Into<String>) -> Self {
Error::RegistryUnreachable {
msg: msg.into(),
source: None,
}
}
pub(crate) fn pool_exhausted(msg: impl Into<String>) -> Self {
Error::PoolExhausted { msg: msg.into() }
}
pub(crate) fn kernel_panic(first_line: impl Into<String>, stack: Vec<String>) -> Self {
Error::KernelPanic {
first_line: first_line.into(),
stack,
}
}
pub(crate) fn is_likely_multi_vcpu_restore_stall(&self) -> bool {
let Error::Vm { msg, .. } = self else {
return false;
};
msg.contains("Resource temporarily unavailable")
|| msg.contains("os error 35")
|| msg.contains("os error 60") || msg.contains("timed out")
|| (msg.contains("probe failed") && !msg.contains("speaks protocol v"))
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, source } => f
.debug_struct("Image")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Vm { msg, source } => f
.debug_struct("Vm")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Assets { msg, source } => f
.debug_struct("Assets")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Io(e) => f.debug_tuple("Io").field(e).finish(),
Error::Network { msg, source } => f
.debug_struct("Network")
.field("msg", msg)
.field("source", source)
.finish(),
Error::CacheMiss { msg } => f.debug_struct("CacheMiss").field("msg", msg).finish(),
Error::CacheInvalid { msg } => {
f.debug_struct("CacheInvalid").field("msg", msg).finish()
}
Error::Bake { msg, source } => f
.debug_struct("Bake")
.field("msg", msg)
.field("source", source)
.finish(),
Error::ImageNotFound { image, msg, source } => f
.debug_struct("ImageNotFound")
.field("image", image)
.field("msg", msg)
.field("source", source)
.finish(),
Error::RegistryAuth { image, msg, source } => f
.debug_struct("RegistryAuth")
.field("image", image)
.field("msg", msg)
.field("source", source)
.finish(),
Error::RegistryUnreachable { msg, source } => f
.debug_struct("RegistryUnreachable")
.field("msg", msg)
.field("source", source)
.finish(),
Error::PoolExhausted { msg } => {
f.debug_struct("PoolExhausted").field("msg", msg).finish()
}
Error::KernelPanic { first_line, stack } => f
.debug_struct("KernelPanic")
.field("first_line", first_line)
.field("stack_lines", &stack.len())
.finish(),
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, .. } => write!(f, "image: {msg}"),
Error::Vm { msg, .. } => write!(f, "vm: {msg}"),
Error::Assets { msg, .. } => write!(f, "assets: {msg}"),
Error::Io(e) => write!(f, "io: {e}"),
Error::Network { msg, .. } => write!(f, "network: {msg}"),
Error::CacheMiss { msg } => write!(f, "cache miss: {msg}"),
Error::CacheInvalid { msg } => write!(f, "cache invalid: {msg}"),
Error::Bake { msg, .. } => write!(f, "bake: {msg}"),
Error::ImageNotFound { image, msg, .. } => {
write!(f, "image not found ({image}): {msg}")
}
Error::RegistryAuth { image, msg, .. } => {
write!(f, "registry auth failed for {image}: {msg}")
}
Error::RegistryUnreachable { msg, .. } => {
write!(f, "registry unreachable: {msg}")
}
Error::PoolExhausted { msg } => write!(f, "pool exhausted: {msg}"),
Error::KernelPanic { first_line, stack } => {
writeln!(f, "kernel panic during bake: {first_line}")?;
for line in stack {
writeln!(f, " {line}")?;
}
Ok(())
}
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Image { source, .. }
| Error::Vm { source, .. }
| Error::Assets { source, .. }
| Error::Network { source, .. }
| Error::Bake { source, .. }
| Error::ImageNotFound { source, .. }
| Error::RegistryAuth { source, .. }
| Error::RegistryUnreachable { source, .. } => source
.as_ref()
.map(|s| s.as_ref() as &(dyn std::error::Error + 'static)),
Error::Io(e) => Some(e),
Error::CacheMiss { .. }
| Error::CacheInvalid { .. }
| Error::PoolExhausted { .. }
| Error::KernelPanic { .. } => None,
}
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
impl From<WarmPoolError> for Error {
fn from(e: WarmPoolError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
impl From<PoolClientError> for Error {
fn from(e: PoolClientError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PullPolicy {
Always,
Missing,
Never,
}
impl Default for PullPolicy {
fn default() -> Self {
Self::Missing
}
}
impl PullPolicy {
fn as_bake_str(self) -> &'static str {
match self {
Self::Always => "always",
Self::Missing => "missing",
Self::Never => "never",
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct KvmImageParts {
pub(crate) kernel: Option<PathBuf>,
pub(crate) initrd: Option<PathBuf>,
pub(crate) disk: Option<PathBuf>,
pub(crate) snapshot: Option<PathBuf>,
}
#[derive(Debug, Clone)]
pub struct Image {
snapshot_path: PathBuf,
pub(crate) memory_mib: u32,
pub(crate) vcpus: u32,
pub(crate) baker_runtime_sha16: Option<String>,
pub(crate) balloon_target_pages: Option<u32>,
pub(crate) tsi_token: Option<String>,
pub(crate) egress_policy: Option<String>,
pub(crate) pre_exec_sync: bool,
pub(crate) layers: Vec<PathBuf>,
pub(crate) delta_squashfs: Option<PathBuf>,
pub(crate) mounts: Vec<crate::vmm::resources::MountSpec>,
pub(crate) volumes: Vec<(PathBuf, String, u64, Option<PathBuf>)>,
pub(crate) bundled_kernel: Option<PathBuf>,
pub(crate) kvm: Option<KvmImageParts>,
pub(crate) hidden_pool: std::sync::OnceLock<Arc<HiddenPool>>,
pub(crate) warm_baked_worker: Arc<crate::bake::WarmStash>,
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub(crate) kvm_pool: std::sync::OnceLock<Arc<Pool>>,
}
impl Image {
#[doc(hidden)]
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn is_kvm_cold_boot(&self) -> bool {
self.kvm
.as_ref()
.map(|k| k.snapshot.is_none())
.unwrap_or(false)
}
#[doc(hidden)]
pub fn _warm_handoff_present(&self) -> bool {
self.warm_baked_worker
.inner
.lock()
.map(|g| g.is_some())
.unwrap_or(false)
}
#[doc(hidden)]
pub fn _warm_handoff_pid(&self) -> Option<u32> {
self.warm_baked_worker
.inner
.lock()
.ok()
.and_then(|g| g.as_ref().map(|bw| bw.child.id()))
}
}
#[doc(hidden)]
pub struct HiddenPool {
state: Arc<Mutex<PoolState>>,
available: Arc<Condvar>,
dirty: Option<Arc<Mutex<VecDeque<Worker>>>>,
dirty_pending: Option<Arc<Condvar>>,
socks_dir: PathBuf,
shutting_down: Arc<AtomicBool>,
spawn_cfg: Arc<SpawnConfig>,
policy: PoolPolicy,
}
#[derive(Debug, Clone, Copy)]
struct PoolPolicy {
min: usize,
max: usize,
idle_timeout: Duration,
acquire_timeout: Option<Duration>,
restore_on_release: bool,
}
impl Default for PoolPolicy {
fn default() -> Self {
Self {
min: 0,
max: 64,
idle_timeout: Duration::from_secs(60),
acquire_timeout: Some(Duration::from_secs(60)),
restore_on_release: true,
}
}
}
struct PoolState {
idle: Vec<IdleEntry>,
alive: usize,
waiting: usize,
reaped: u64,
}
struct IdleEntry {
worker: Worker,
last_used: Instant,
}
struct Worker {
child: Child,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
control_path: PathBuf,
control: Arc<Mutex<ControlChannel>>,
last_restore_path: PathBuf,
memory_mib: u32,
has_mounts: bool,
vcpus: u32,
volume_temp_files: Vec<PathBuf>,
volume_guest_paths: Vec<String>,
has_cow_volumes: bool,
_admission: crate::memory_admission::AdmissionGuard,
}
struct SnapshotStats {
bytes_written: u64,
capture_us: u64,
save_us: u64,
}
struct ControlChannel {
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
writer: std::os::unix::net::UnixStream,
}
impl ControlChannel {
fn send_line(&mut self, line: &str) -> std::io::Result<()> {
use std::io::Write;
self.writer.write_all(line.as_bytes())?;
if !line.ends_with('\n') {
self.writer.write_all(b"\n")?;
}
self.writer.flush()
}
fn read_line(&mut self) -> std::io::Result<String> {
use std::io::BufRead;
loop {
let mut buf = String::new();
let n = self.reader.read_line(&mut buf)?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"worker control socket closed",
));
}
let trimmed = buf.trim_start();
if trimmed.starts_with("SAVE_DONE ") || trimmed.starts_with("SAVE_FAIL ") {
tracing::debug!(line = %buf.trim_end(), "skipping orthogonal bg-save notification");
continue;
}
if trimmed.starts_with("MARKER_KCACHE_READY") {
tracing::debug!("skipping MARKER_KCACHE_READY notification");
continue;
}
return Ok(buf);
}
}
}
#[cfg(target_os = "macos")]
fn warm_baked_to_worker(
bw: crate::bake::BakedWorker,
memory_mib: u32,
has_mounts: bool,
volume_guest_paths: Vec<String>,
vcpus: u32,
use_once: bool,
) -> Worker {
Worker {
child: bw.child,
vsock_mux_path: bw.vsock_mux_path,
vsock_exec_path: bw.vsock_exec_path,
control_path: bw.control_path,
control: Arc::new(Mutex::new(ControlChannel {
reader: std::io::BufReader::new(bw.control_reader),
writer: bw.control_writer,
})),
last_restore_path: bw.last_restore_path,
memory_mib,
has_mounts,
vcpus,
volume_temp_files: Vec::new(),
volume_guest_paths,
has_cow_volumes: use_once,
_admission: crate::memory_admission::charge(
memory_mib as u64 + crate::memory_admission::WORKER_OVERHEAD_MIB,
),
}
}
#[cfg(target_os = "macos")]
fn wait_for_volumes_mounted(exec_path: &Path, guest_paths: &[String]) -> Result<(), Error> {
let t0 = std::time::Instant::now();
let budget = std::time::Duration::from_secs(3);
loop {
let out = crate::exec::ExecBuilder::new(exec_path.to_path_buf())
.argv(["cat", "/proc/mounts"])
.timeout(std::time::Duration::from_secs(3))
.output()
.map_err(|e| {
Error::vm_msg(format!(
"post-restore volume-readiness exec failed in {:?}: {e} — \
worker will be respawned",
t0.elapsed()
))
})?;
if out.status.code() == Some(0) {
let mounts = String::from_utf8_lossy(&out.stdout);
let all_mounted = guest_paths.iter().all(|gp| {
mounts
.lines()
.any(|l| l.split(' ').nth(1) == Some(gp.as_str()))
});
if all_mounted {
return Ok(());
}
}
if t0.elapsed() >= budget {
return Err(Error::vm_msg(format!(
"post-restore volumes not mounted within {budget:?}: {guest_paths:?} \
absent from /proc/mounts — worker will be respawned"
)));
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
impl Worker {
fn send_restore(&self, snapshot_path: &Path) -> Result<(), Error> {
let path_str = snapshot_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("RESTORE {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if !line.starts_with("DONE") {
return Err(Error::vm_msg(format!(
"worker RESTORE: expected DONE response, got: {}",
line.trim()
)));
}
if self.vcpus > 1 {
ctl.send_line("SMPARK_UNPARK_DIRECT").map_err(Error::Io)?;
let _ = ctl.read_line().map_err(Error::Io)?;
}
drop(ctl);
let probe_enabled = std::env::var("SUPERMACHINE_POST_RESTORE_PROBE")
.map(|v| v != "0" && v != "false")
.unwrap_or(true);
if probe_enabled {
let per_attempt = std::time::Duration::from_millis(200);
let max_attempts = 10;
for attempt in 0..max_attempts {
let body = serde_json::json!({ "action": "probe" });
match crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(per_attempt),
) {
Ok(_) => break,
Err(_) if attempt + 1 < max_attempts => {}
Err(e) => {
return Err(Error::vm_msg(format!(
"post-restore agent probe failed after {max_attempts} attempts: {e}"
)));
}
}
}
if self.vcpus > 1 {
let body = serde_json::json!({ "action": "smpark_unpark" });
let _ = crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(std::time::Duration::from_millis(500)),
);
}
let liveness_enabled = std::env::var("SUPERMACHINE_POST_RESTORE_LIVENESS")
.map(|v| v != "0" && v != "false")
.unwrap_or(true);
if liveness_enabled {
let liveness_t0 = std::time::Instant::now();
let exec_res = crate::exec::ExecBuilder::new(self.vsock_exec_path.clone())
.argv(["/bin/true"])
.timeout(std::time::Duration::from_secs(3))
.output();
match exec_res {
Ok(out) if out.status.code() == Some(0) => {
}
Ok(out) => {
return Err(Error::vm_msg(format!(
"post-restore liveness exec returned non-zero in {:?}: exit={:?} \
stderr={:?} — worker will be respawned",
liveness_t0.elapsed(),
out.status.code(),
String::from_utf8_lossy(&out.stderr)
.chars()
.take(200)
.collect::<String>(),
)));
}
Err(e) => {
return Err(Error::vm_msg(format!(
"post-restore liveness exec failed in {:?}: {e} — worker will be respawned",
liveness_t0.elapsed(),
)));
}
}
}
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
if !self.volume_guest_paths.is_empty() {
wait_for_volumes_mounted(&self.vsock_exec_path, &self.volume_guest_paths)?;
}
if self.has_mounts {
let _ = drop_vfs_caches_via_agent(&self.vsock_exec_path, self.memory_mib);
}
let _ = sync_time_via_agent(&self.vsock_exec_path);
Ok(())
}
fn send_snapshot(&self, out_path: &Path) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("SNAPSHOT {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT: unexpected response: {}",
line.trim()
)))
}
}
fn send_snapshot_with_base(
&self,
out_path: &Path,
base_path: &Path,
inline: bool,
) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot out path is not valid UTF-8".to_owned()))?;
let base_str = base_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot base path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
let line = if inline {
format!("SNAPSHOT {path_str} base={base_str} inline")
} else {
format!("SNAPSHOT {path_str} base={base_str}")
};
ctl.send_line(&line).map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT (with base) failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT (with base): unexpected response: {}",
line.trim()
)))
}
}
#[allow(dead_code)]
fn send_snapshot_async(&self, out_path: &Path) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("SNAPSHOT_ASYNC {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT_ASYNC") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT_ASYNC failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT_ASYNC: unexpected response: {}",
line.trim()
)))
}
}
#[cfg_attr(
not(all(target_os = "macos", target_arch = "aarch64")),
allow(dead_code)
)]
fn send_smpark_park(&self) -> Result<bool, Error> {
let body = serde_json::json!({ "action": "smpark_park" });
match crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(Duration::from_secs(5)),
) {
Ok(_) => Ok(true),
Err(e) => {
tracing::debug!(error = %e, "smpark_park unavailable; skipping");
Ok(false)
}
}
}
#[cfg_attr(
not(all(target_os = "macos", target_arch = "aarch64")),
allow(dead_code)
)]
fn send_smpark_unpark(&self) -> Result<bool, Error> {
let body = serde_json::json!({ "action": "smpark_unpark" });
match crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(Duration::from_secs(5)),
) {
Ok(_) => Ok(true),
Err(e) => {
tracing::debug!(error = %e, "smpark_unpark unavailable; skipping");
Ok(false)
}
}
}
fn shutdown(&mut self) {
if let Ok(mut ctl) = self.control.lock() {
let _ = ctl.send_line("QUIT");
}
let deadline = Instant::now() + Duration::from_millis(100);
loop {
match self.child.try_wait() {
Ok(Some(_)) => break,
Ok(None) if Instant::now() < deadline => {
std::thread::sleep(Duration::from_millis(2));
}
_ => {
let _ = self.child.kill();
let _ = self.child.wait();
break;
}
}
}
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
let _ = std::fs::remove_file(&self.control_path);
let mut h = self.vsock_mux_path.clone();
h.set_extension("handoff");
let _ = std::fs::remove_file(&h);
for path in self.volume_temp_files.drain(..) {
let _ = std::fs::remove_file(&path);
}
}
}
struct SpawnConfig {
observed_footprint_mib: std::sync::atomic::AtomicU64,
worker_bin: PathBuf,
snapshot_path: PathBuf,
layers: Vec<PathBuf>,
delta_squashfs: Option<PathBuf>,
mounts: Vec<crate::vmm::resources::MountSpec>,
volumes: Vec<(PathBuf, String, u64, Option<PathBuf>)>,
memory_mib: u32,
vcpus: u32,
socks_dir: PathBuf,
name_prefix: String,
spawn_timeout: Duration,
baker_runtime_sha16: Option<String>,
balloon_target_pages: Option<u32>,
tsi_token: Option<String>,
egress_policy: Option<String>,
pre_exec_sync: bool,
restore_on_release: bool,
}
fn rss_admission_charge_enabled() -> bool {
std::env::var("SUPERMACHINE_ADMISSION_CHARGE")
.map(|v| v.trim().eq_ignore_ascii_case("rss"))
.unwrap_or(false)
}
fn rss_charge_mib(memory_mib: u32, observed_mib: u64, overhead: u64) -> u64 {
let cap = memory_mib as u64 + overhead;
if observed_mib == 0 {
return cap;
}
let floor = (memory_mib as u64 / 2) + overhead;
let margined = observed_mib + observed_mib / 4 + overhead; margined.clamp(floor, cap)
}
#[cfg(test)]
mod rss_charge_tests {
use super::rss_charge_mib;
const OH: u64 = 64;
#[test]
fn unmeasured_charges_full_cap() {
assert_eq!(rss_charge_mib(512, 0, OH), 512 + OH);
}
#[test]
fn measured_charges_observed_plus_margin() {
assert_eq!(rss_charge_mib(512, 300, OH), 375 + OH);
}
#[test]
fn floored_at_half_cap_when_footprint_tiny() {
assert_eq!(rss_charge_mib(512, 100, OH), 256 + OH);
}
#[test]
fn never_exceeds_the_cap() {
assert_eq!(rss_charge_mib(512, 1000, OH), 512 + OH);
}
}
impl SpawnConfig {
fn admission_charge_mib(&self) -> u64 {
let overhead = crate::memory_admission::WORKER_OVERHEAD_MIB;
if !rss_admission_charge_enabled() {
return self.memory_mib as u64 + overhead;
}
let observed = self
.observed_footprint_mib
.load(std::sync::atomic::Ordering::Relaxed);
rss_charge_mib(self.memory_mib, observed, overhead)
}
fn spawn_one(&self) -> Result<Worker, Error> {
use std::os::unix::net::UnixListener;
let suffix = unique_suffix();
let vsock_mux_path = self
.socks_dir
.join(format!("{}-{:016x}.sock", self.name_prefix, suffix));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let control_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-ctl");
p.set_file_name(name);
p
};
let _ = std::fs::remove_file(&vsock_mux_path);
let _ = std::fs::remove_file(&vsock_exec_path);
let _ = std::fs::remove_file(&control_path);
let ctl_listener = UnixListener::bind(&control_path).map_err(|e| {
Error::vm_msg(format!(
"bind control socket {}: {e}",
control_path.display()
))
})?;
if !self.snapshot_path.is_file() {
let poll_t0 = Instant::now();
let poll_deadline = poll_t0 + self.spawn_timeout;
let mut backoff = Duration::from_millis(2);
loop {
if self.snapshot_path.is_file() {
if crate::trace::enabled("timings") {
eprintln!(
"[spawn_one] waited {:?} for snapshot file to land at {}",
poll_t0.elapsed(),
self.snapshot_path.display(),
);
}
break;
}
if Instant::now() > poll_deadline {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: snapshot file {} did not appear within {:?} \
(bg save in flight from a pipelined bake — increase \
`VmConfig::restore_timeout` if your disk is slow)",
self.snapshot_path.display(),
self.spawn_timeout
)));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(Duration::from_millis(50));
}
}
let mut cmd = Command::new(&self.worker_bin);
for layer in &self.layers {
cmd.arg("--virtio-blk").arg(layer);
}
if let Some(delta) = &self.delta_squashfs {
cmd.arg("--virtio-blk").arg(delta);
}
for m in &self.mounts {
cmd.arg("--mount")
.arg(format!("{}:{}:{}", m.host_path, m.guest_tag, m.guest_path));
}
let cow_volumes_enabled = self.volumes.iter().any(|(_, _, _, p)| p.is_some());
let mut volume_temp_files: Vec<PathBuf> = Vec::new();
for (i, (host_file, guest_path, size_bytes, pristine)) in self.volumes.iter().enumerate() {
let effective_host = if cow_volumes_enabled {
match pristine.as_ref() {
Some(pristine_path) => {
match cow_volume_for_spawn(pristine_path, i, suffix) {
Ok(temp) => {
volume_temp_files.push(temp.clone());
temp
}
Err(e) => {
for p in &volume_temp_files {
let _ = std::fs::remove_file(p);
}
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"spawn_one: COW volume {}: {e}",
pristine_path.display()
)));
}
}
}
None => {
warn_legacy_volume_without_pristine_once(host_file);
host_file.clone()
}
}
} else {
host_file.clone()
};
cmd.arg("--volume").arg(format!(
"{}:{}:{}",
effective_host.display(),
guest_path,
size_bytes
));
}
cmd.arg("--memory").arg(self.memory_mib.to_string());
cmd.arg("--vcpus").arg(self.vcpus.to_string());
cmd.arg("--restore-from").arg(&self.snapshot_path);
cmd.arg("--cow-restore");
cmd.arg("--vsock-mux").arg(&vsock_mux_path);
cmd.arg("--vsock-exec").arg(&vsock_exec_path);
cmd.arg("--pool-worker").arg(&control_path);
cmd.env(
"SUPERMACHINE_HOST_IPV6",
if crate::utils::net::cached_host_ipv6_route() {
"1"
} else {
"0"
},
);
if let Some(pages) = self.balloon_target_pages {
cmd.arg("--balloon-target-pages").arg(pages.to_string());
}
if let Some(hex) = self.tsi_token.as_deref() {
cmd.arg("--tsi-token").arg(hex);
}
if let Some(policy) = self.egress_policy.as_deref() {
cmd.arg("--egress-policy").arg(policy);
}
if self.pre_exec_sync {
cmd.env("SUPERMACHINE_PRE_EXEC_SYNC_RESUME", "1");
}
let log_to_stdio = std::env::var("SUPERMACHINE_WORKER_LOG")
.map(|v| v == "1" || v == "true")
.unwrap_or(false);
let log_file_path = std::env::var("SUPERMACHINE_WORKER_LOG_FILE").ok().map(|p| {
if p.contains('%') {
p.replace('%', &format!("{suffix:016x}"))
} else {
p
}
});
if let Some(path) = log_file_path.as_deref() {
let f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.ok();
if let Some(f) = f {
let f2 = f.try_clone().ok();
cmd.stdout(Stdio::from(f));
if let Some(f2) = f2 {
cmd.stderr(Stdio::from(f2));
} else {
cmd.stderr(Stdio::null());
}
}
} else if !log_to_stdio {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
if rss_admission_charge_enabled() {
crate::memory_admission::await_pressure_relief();
}
let admission_guard = crate::memory_admission::admit(self.admission_charge_mib());
let _spawn_permit = crate::spawn_concurrency::acquire();
let __t0 = Instant::now();
let child = cmd.spawn().map_err(|e| {
Error::vm_msg(format!("spawn worker {}: {e}", self.worker_bin.display()))
})?;
let __t_spawned = __t0.elapsed();
struct SpawnCleanup {
child: Option<std::process::Child>,
cleanup_paths: Vec<PathBuf>,
armed: bool,
}
impl Drop for SpawnCleanup {
fn drop(&mut self) {
if !self.armed {
return;
}
if let Some(mut c) = self.child.take() {
let _ = c.kill();
let _ = c.wait();
}
for p in &self.cleanup_paths {
let _ = std::fs::remove_file(p);
}
}
}
let mut spawn_cleanup = {
let mut cleanup_paths = volume_temp_files.clone();
cleanup_paths.push(control_path.clone());
SpawnCleanup {
child: Some(child),
cleanup_paths,
armed: true,
}
};
ctl_listener
.set_nonblocking(true)
.map_err(|e| Error::vm_msg(format!("set control listener nonblocking: {e}")))?;
let deadline = Instant::now() + self.spawn_timeout;
let mut backoff = Duration::from_millis(1);
let stream = loop {
match ctl_listener.accept() {
Ok((s, _)) => break s,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() > deadline {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: control connect did not arrive within {:?}",
self.spawn_timeout
)));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(Duration::from_millis(10));
}
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!("worker spawn: control accept: {e}")));
}
}
};
stream
.set_nonblocking(false)
.map_err(|e| Error::vm_msg(format!("set control stream blocking: {e}")))?;
let writer = stream
.try_clone()
.map_err(|e| Error::vm_msg(format!("clone control stream: {e}")))?;
let mut control = ControlChannel {
reader: std::io::BufReader::new(stream),
writer,
};
let ready = match control.read_line() {
Ok(l) => l,
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!("worker spawn: read READY: {e}")));
}
};
if ready.trim() != "READY" {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: expected READY, got: {}",
ready.trim()
)));
}
let snap_path_str = self.snapshot_path.to_string_lossy().to_string();
if let Err(e) = control.send_line(&format!("RESTORE {snap_path_str}")) {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: send initial RESTORE: {e}"
)));
}
let done = match control.read_line() {
Ok(l) => l,
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: read initial DONE: {e}"
)));
}
};
if !done.starts_with("DONE") {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: expected initial DONE, got: {}",
done.trim()
)));
}
if crate::trace::enabled("timings") {
eprintln!(
"[spawn_one] spawn={:?} accept_to_done={:?} total={:?}",
__t_spawned,
__t0.elapsed() - __t_spawned,
__t0.elapsed()
);
}
if self.vcpus > 1 {
let body = serde_json::json!({ "action": "smpark_unpark" });
match crate::exec::send_control_with_ack(
&vsock_exec_path,
&body,
Some(Duration::from_secs(2)),
) {
Ok(_) => {}
Err(e) => {
tracing::debug!(
error = %e,
"post-restore smpark_unpark RPC error (continuing)"
);
}
}
}
if !self.mounts.is_empty() || !self.volumes.is_empty() {
let _ = drop_vfs_caches_via_agent(&vsock_exec_path, self.memory_mib);
}
let _ = sync_time_via_agent(&vsock_exec_path);
let probe_skip = self
.baker_runtime_sha16
.as_deref()
.and_then(|stored| {
current_worker_sha16(&self.worker_bin).map(|current| stored == current.as_str())
})
.unwrap_or(false);
if !probe_skip {
if let Err(e) = probe_agent_protocol(&vsock_exec_path, self.vcpus) {
if self.vcpus > 1 && e.is_likely_multi_vcpu_restore_stall() {
eprintln!(
"supermachine: WARNING multi-vCPU ({}vCPU) snapshot agent probe \
timed out — guest may be RCU-stalled after restore. \
Continuing (multi-vCPU is unsupported per design); \
workload exec will surface a real error if the guest is hung.",
self.vcpus
);
} else {
let _ = std::fs::remove_file(&control_path);
return Err(e);
}
}
}
if !self.volumes.is_empty() {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
let gps: Vec<String> = self
.volumes
.iter()
.map(|(_, gp, _, _)| gp.clone())
.collect();
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
if let Err(e) = wait_for_volumes_mounted(&vsock_exec_path, &gps) {
let _ = std::fs::remove_file(&control_path);
return Err(e);
}
}
spawn_cleanup.armed = false;
let child = spawn_cleanup
.child
.take()
.expect("spawn_cleanup holds the child until success");
if let Some(fp) = crate::memory_admission::phys_footprint_mib(child.id()) {
let prev = self
.observed_footprint_mib
.fetch_max(fp, std::sync::atomic::Ordering::Relaxed);
if fp > prev {
eprintln!(
"[admission] observed worker footprint {fp} MiB (cap {} MiB) — \
new high-water-mark",
self.memory_mib
);
}
}
Ok(Worker {
child,
vsock_mux_path,
vsock_exec_path,
control_path,
control: Arc::new(Mutex::new(control)),
last_restore_path: self.snapshot_path.clone(),
memory_mib: self.memory_mib,
has_mounts: !self.mounts.is_empty() || !self.volumes.is_empty(),
volume_guest_paths: self
.volumes
.iter()
.map(|(_, gp, _, _)| gp.clone())
.collect(),
vcpus: self.vcpus,
has_cow_volumes: !volume_temp_files.is_empty(),
volume_temp_files,
_admission: admission_guard,
})
}
}
fn cow_volume_for_spawn(pristine: &Path, index: usize, suffix: u64) -> std::io::Result<PathBuf> {
let pid = std::process::id();
let dst =
std::env::temp_dir().join(format!("supermachine-vol-{pid}-{suffix:016x}-{index}.img"));
crate::bake::cow_or_copy(pristine, &dst)?;
Ok(dst)
}
fn sweep_volume_temps_once() {
static SWEPT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if SWEPT.get().is_some() {
return;
}
sweep_stranded_volume_temps();
let _ = SWEPT.set(());
}
fn sweep_stranded_volume_temps() {
let tmp = std::env::temp_dir();
let entries = match std::fs::read_dir(&tmp) {
Ok(e) => e,
Err(_) => return,
};
let me = std::process::id();
for entry in entries.flatten() {
let fname = entry.file_name();
let Some(name) = fname.to_str() else {
continue;
};
let Some(rest) = name.strip_prefix("supermachine-vol-") else {
continue;
};
if !name.ends_with(".img") {
continue;
}
let Some(dash) = rest.find('-') else {
continue;
};
let pid_str = &rest[..dash];
let Ok(pid) = pid_str.parse::<i32>() else {
continue;
};
if pid as u32 == me {
continue;
}
let alive = unsafe { libc::kill(pid, 0) };
if alive == 0 {
continue;
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(libc::ESRCH) {
continue;
}
let _ = std::fs::remove_file(entry.path());
}
}
fn warn_legacy_volume_without_pristine_once(host_file: &Path) {
static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if WARNED.get().is_some() {
return;
}
let _ = WARNED.set(());
eprintln!(
"supermachine: WARNING snapshot lacks pristine volume bytes (host_file={}); \
workers will write directly to the host file, leaking state across \
processes and across `restore_on_release=true` acquires — re-bake \
under 0.7.49+ for per-spawn COW reset semantics",
host_file.display()
);
}
const HOST_AGENT_PROTOCOL_MIN: u32 = 3;
fn current_worker_sha16(worker_bin: &Path) -> Option<String> {
use std::sync::Mutex;
static CACHE: Mutex<Option<(PathBuf, u64, u128, String)>> = Mutex::new(None);
let meta = std::fs::metadata(worker_bin).ok()?;
let len = meta.len();
let mtime_ns = meta
.modified()
.ok()?
.duration_since(std::time::UNIX_EPOCH)
.ok()?
.as_nanos();
if let Ok(g) = CACHE.lock() {
if let Some((p, l, m, sha)) = g.as_ref() {
if p == worker_bin && *l == len && *m == mtime_ns {
return Some(sha.clone());
}
}
}
use ring::digest::{Context, SHA256};
use std::io::Read;
let mut f = std::fs::File::open(worker_bin).ok()?;
let mut ctx = Context::new(&SHA256);
let mut buf = [0u8; 64 * 1024];
loop {
let n = f.read(&mut buf).ok()?;
if n == 0 {
break;
}
ctx.update(&buf[..n]);
}
let d = ctx.finish();
let mut digest = String::with_capacity(64);
for b in d.as_ref() {
use std::fmt::Write;
let _ = write!(digest, "{:02x}", b);
}
let sha16 = digest[..digest.len().min(16)].to_owned();
if let Ok(mut g) = CACHE.lock() {
*g = Some((worker_bin.to_path_buf(), len, mtime_ns, sha16.clone()));
}
Some(sha16)
}
fn drop_vfs_caches_via_agent(vsock_exec_path: &Path, memory_mib: u32) -> bool {
let setting = std::env::var("SUPERMACHINE_VFS_AUTO_INVALIDATE").ok();
let s = setting.as_deref();
if s == Some("0") {
return false;
}
let force = s == Some("force") || s == Some("FORCE");
if memory_mib < 512 && !force {
if crate::trace::enabled("vfs") {
eprintln!(
"[drop_vfs_caches] SKIP memory_mib={memory_mib} < 512 (set \
SUPERMACHINE_VFS_AUTO_INVALIDATE=force to override)"
);
}
return false;
}
let body = serde_json::json!({ "action": "drop_vfs_caches" });
let t0 = Instant::now();
let trace = crate::trace::enabled("vfs");
match crate::exec::send_control_with_ack(vsock_exec_path, &body, Some(Duration::from_secs(5))) {
Ok(ack) => {
if trace {
eprintln!(
"[drop_vfs_caches] OK in {} ms ack={ack:?}",
t0.elapsed().as_millis()
);
}
true
}
Err(e) => {
if trace {
eprintln!(
"[drop_vfs_caches] FAIL in {} ms ({e})",
t0.elapsed().as_millis()
);
}
tracing::debug!(
error = %e,
"drop_vfs_caches RPC error (continuing with stale caches)"
);
false
}
}
}
fn sync_time_via_agent(vsock_exec_path: &Path) -> bool {
if std::env::var("SUPERMACHINE_TIME_AUTO_SYNC").as_deref() == Ok("0") {
return false;
}
let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(d) => d.as_nanos(),
Err(_) => return false,
};
let host_unix_ns = u64::try_from(now).unwrap_or(u64::MAX);
let body = serde_json::json!({
"action": "sync_time",
"host_unix_ns": host_unix_ns,
});
let t0 = Instant::now();
let trace = crate::trace::enabled("time");
match crate::exec::send_control_with_ack(vsock_exec_path, &body, Some(Duration::from_secs(5))) {
Ok(ack) => {
if trace {
eprintln!(
"[sync_time] OK in {} ms ack={ack:?}",
t0.elapsed().as_millis()
);
}
true
}
Err(e) => {
if trace {
eprintln!("[sync_time] FAIL in {} ms ({e})", t0.elapsed().as_millis());
}
tracing::debug!(
error = %e,
"sync_time RPC error (continuing with frozen wall-clock)"
);
false
}
}
}
fn probe_agent_protocol(vsock_exec_path: &Path, vcpus: u32) -> Result<(), Error> {
let timeout = if vcpus > 1 {
Duration::from_secs(1)
} else {
Duration::from_secs(10)
};
let body = serde_json::json!({ "action": "probe" });
let ack = match crate::exec::send_control_with_ack(vsock_exec_path, &body, Some(timeout)) {
Ok(a) => a,
Err(e) => {
return Err(Error::vm_msg(format!(
"agent in this snapshot is from an older supermachine release \
(probe failed: {e}). Rebake the snapshot to pick up the new \
agent: rm -rf the snapshot dir and re-run your bake (e.g. \
`supermachine pull <image> --name <name>`)."
)));
}
};
let proto = ack.get("protocol").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
if proto < HOST_AGENT_PROTOCOL_MIN {
return Err(Error::vm_msg(format!(
"agent in this snapshot speaks protocol v{proto} but this \
supermachine library expects v{HOST_AGENT_PROTOCOL_MIN}+. The \
snapshot was baked against a previous release; rebake to pick up \
the new agent (rm -rf the snapshot dir and re-run your bake)."
)));
}
Ok(())
}
impl std::fmt::Debug for HiddenPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = self.state.lock().ok();
f.debug_struct("HiddenPool")
.field("socks_dir", &self.socks_dir)
.field("alive", &s.as_ref().map(|s| s.alive).unwrap_or(usize::MAX))
.field(
"idle",
&s.as_ref().map(|s| s.idle.len()).unwrap_or(usize::MAX),
)
.finish()
}
}
impl HiddenPool {
fn shutdown_pool(&self) {
self.shutting_down.store(true, Ordering::SeqCst);
self.available.notify_all();
if let Some(c) = self.dirty_pending.as_ref() {
c.notify_all();
}
if let Ok(mut s) = self.state.lock() {
while let Some(mut e) = s.idle.pop() {
e.worker.shutdown();
s.alive = s.alive.saturating_sub(1);
}
if let Some(d) = self.dirty.as_ref() {
if let Ok(mut q) = d.lock() {
while let Some(mut w) = q.pop_front() {
w.shutdown();
s.alive = s.alive.saturating_sub(1);
}
}
}
}
}
}
impl Drop for HiddenPool {
fn drop(&mut self) {
self.shutdown_pool();
let _ = std::fs::remove_dir_all(&self.socks_dir);
}
}
impl HiddenPool {
fn acquire(&self) -> Result<Worker, Error> {
let acquire_t0 = Instant::now();
let acquire_timeout = self.policy.acquire_timeout;
let mut state = self
.state
.lock()
.map_err(|_| Error::vm_msg("pool mutex poisoned".to_owned()))?;
loop {
while let Some(mut entry) = state.idle.pop() {
match entry.worker.child.try_wait() {
Ok(None) => {
if !self.policy.restore_on_release {
let idle_for = entry.last_used.elapsed();
if idle_for > Duration::from_secs(1) {
let exec_path = entry.worker.vsock_exec_path.clone();
drop(state);
let _ = sync_time_via_agent(&exec_path);
return Ok(entry.worker);
}
}
return Ok(entry.worker); }
Ok(Some(_status)) => {
state.alive = state.alive.saturating_sub(1);
continue; }
Err(_) => {
state.alive = state.alive.saturating_sub(1);
continue;
}
}
}
if self.shutting_down.load(Ordering::SeqCst) {
return Err(Error::vm_msg("pool is shutting down".to_owned()));
}
if state.alive < self.policy.max {
state.alive += 1;
drop(state);
let spawned = self.spawn_cfg.spawn_one();
match spawned {
Ok(w) => return Ok(w),
Err(e) => {
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
return Err(e);
}
}
}
if let Some(total) = acquire_timeout {
if acquire_t0.elapsed() >= total {
return Err(Error::pool_exhausted(format!(
"acquire timed out after {total:?}; pool at max ({})",
self.policy.max
)));
}
}
state.waiting += 1;
let (new_state, timed_out) = match acquire_timeout {
None => match self.available.wait(state) {
Ok(s) => (s, false),
Err(_) => return Err(Error::vm_msg("pool condvar poisoned".to_owned())),
},
Some(total) => {
let remaining = total.saturating_sub(acquire_t0.elapsed());
match self.available.wait_timeout(state, remaining) {
Ok((s, r)) => (s, r.timed_out()),
Err(_) => return Err(Error::vm_msg("pool condvar poisoned".to_owned())),
}
}
};
state = new_state;
state.waiting = state.waiting.saturating_sub(1);
if timed_out {
return Err(Error::pool_exhausted(format!(
"acquire timed out after {:?}; pool at max ({})",
acquire_timeout.unwrap_or_default(),
self.policy.max
)));
}
}
}
fn release(&self, worker: Worker) {
if !self.policy.restore_on_release {
if let Ok(mut s) = self.state.lock() {
s.idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
self.available.notify_all();
}
return;
}
if worker.has_cow_volumes {
let mut w = worker;
w.shutdown();
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
return;
}
if let Some(d) = self.dirty.as_ref() {
if let Ok(mut q) = d.lock() {
q.push_back(worker);
}
if let Some(c) = self.dirty_pending.as_ref() {
c.notify_all();
}
} else {
let mut w = worker;
w.shutdown();
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
}
}
}
impl Image {
pub fn from_snapshot(path: impl Into<PathBuf>) -> Result<Self, Error> {
Self::from_snapshot_inner(path.into(), false)
}
pub(crate) fn from_snapshot_pending(path: impl Into<PathBuf>) -> Result<Self, Error> {
Self::from_snapshot_inner(path.into(), true)
}
fn from_snapshot_inner(path: PathBuf, allow_pending: bool) -> Result<Self, Error> {
let (snapshot_path, metadata_path) = if path.is_dir() {
(path.join("restore.snap"), path.join("metadata.json"))
} else if path.is_file() {
let parent = path.parent().ok_or_else(|| {
Error::image_msg(format!(
"snapshot path has no parent dir: {}",
path.display()
))
})?;
(path.clone(), parent.join("metadata.json"))
} else if allow_pending {
return Err(Error::image_msg(format!(
"snapshot path not found: {}",
path.display()
)));
} else {
return Err(Error::image_msg(format!(
"snapshot path not found: {}",
path.display()
)));
};
if !metadata_path.is_file() {
return Err(Error::image_msg(format!(
"metadata.json not found alongside snapshot at {}",
metadata_path.display()
)));
}
let meta_text = std::fs::read_to_string(&metadata_path)
.map_err(|e| Error::image_msg(format!("read {}: {e}", metadata_path.display())))?;
let meta: serde_json::Value = serde_json::from_str(&meta_text)
.map_err(|e| Error::image_msg(format!("parse {}: {e}", metadata_path.display())))?;
let backend = meta
.get("backend")
.and_then(|v| v.as_str())
.unwrap_or("hvf")
.to_string();
if !allow_pending && backend != "kvm" && !snapshot_path.is_file() {
return Err(Error::image_msg(format!(
"snapshot file not found: {}",
snapshot_path.display()
)));
}
warn_if_snapshot_version_mismatch(&meta, &snapshot_path);
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(256);
let vcpus = meta
.get("vcpus")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(1);
let metadata_dir = metadata_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let resolve_path = |s: &str| -> PathBuf {
let p = PathBuf::from(s);
if p.is_absolute() {
p
} else {
metadata_dir.join(p)
}
};
let layers: Vec<PathBuf> = meta
.get("layers")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(resolve_path))
.collect()
})
.unwrap_or_default();
let delta_squashfs = meta
.get("delta_squashfs")
.and_then(|v| v.as_str())
.map(resolve_path);
if !allow_pending {
for layer in &layers {
if !layer.is_file() {
return Err(Error::image_msg(format!(
"snapshot at {} references missing layer file {}: a sibling \
snapshot dir was likely deleted (e.g. the base dir of a \
`__warm__<tag>` variant). Delete this dir too and re-bake.",
snapshot_path.display(),
layer.display()
)));
}
}
if let Some(delta) = &delta_squashfs {
if !delta.is_file() {
return Err(Error::image_msg(format!(
"snapshot at {} references missing delta squashfs {}: a \
sibling snapshot dir was likely deleted (e.g. the base dir \
of a `__warm__<tag>` variant). Delete this dir too and re-bake.",
snapshot_path.display(),
delta.display()
)));
}
}
}
let mounts: Vec<crate::vmm::resources::MountSpec> = meta
.get("mounts")
.and_then(|v| v.as_array())
.map(|arr| -> Result<Vec<_>, Error> {
arr.iter()
.filter_map(|x| {
let host = x.get("host_path")?.as_str()?;
let tag = x.get("guest_tag")?.as_str()?;
let guest_path = match x.get("guest_path").and_then(|v| v.as_str()) {
Some(p) => p,
None => {
return Some(Err(Error::bake_msg(format!(
"snapshot mount entry for tag `{tag}` has no \
`guest_path` — this snapshot was baked by \
supermachine 0.7.27 or earlier, which used a \
legacy non-auto-mount API. Re-bake with \
`MountSpec::new(host, tag, guest_path)` to \
pin a guest mount point."
))));
}
};
let policy = x
.get("symlinks")
.and_then(|v| v.as_str())
.and_then(parse_symlink_policy)
.unwrap_or_default();
let m = crate::vmm::resources::MountSpec::new(host, tag, guest_path)
.with_symlinks(policy);
Some(Ok(m))
})
.collect()
})
.transpose()?
.unwrap_or_default();
let volumes: Vec<(PathBuf, String, u64, Option<PathBuf>)> = meta
.get("volumes")
.and_then(|v| v.as_array())
.map(|arr| -> Vec<(PathBuf, String, u64, Option<PathBuf>)> {
arr.iter()
.filter_map(|x| {
let host_file = x.get("host_file")?.as_str()?;
let guest_path = x.get("guest_path")?.as_str()?;
let size_bytes = x
.get("size_bytes")
.and_then(|v| v.as_u64())
.unwrap_or(crate::vmm::resources::VolumeSpec::DEFAULT_SIZE_BYTES);
let pristine = x
.get("pristine")
.and_then(|v| v.as_str())
.map(PathBuf::from)
.filter(|p| p.is_file());
Some((
PathBuf::from(host_file),
guest_path.to_owned(),
size_bytes,
pristine,
))
})
.collect()
})
.unwrap_or_default();
let bundled_kernel = {
let cand = metadata_dir.join("kernel");
if cand.is_file() {
Some(cand)
} else {
None
}
};
let baker_runtime_sha16 = meta
.get("runtime_sha16")
.and_then(|v| v.as_str())
.map(|s| s.to_owned());
let balloon_target_pages = meta
.get("balloon_target_pages")
.and_then(|v| v.as_u64())
.and_then(|n| u32::try_from(n).ok())
.filter(|n| *n > 0);
let tsi_token = meta
.get("tsi_token")
.and_then(|v| v.as_str())
.filter(|s| s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()))
.map(|s| s.to_ascii_lowercase());
let egress_policy = meta
.get("egress_policy")
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
let pre_exec_sync = meta
.get("pre_exec_sync")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let kvm = if backend == "kvm" {
let kernel = meta
.get("kvm_kernel")
.and_then(|v| v.as_str())
.map(resolve_path);
let initrd = meta
.get("kvm_initrd")
.and_then(|v| v.as_str())
.map(resolve_path);
let disk = meta
.get("kvm_disk")
.and_then(|v| v.as_str())
.map(resolve_path);
let snapshot = meta
.get("kvm_snapshot")
.and_then(|v| v.as_str())
.map(resolve_path);
if snapshot.is_none() && (kernel.is_none() || initrd.is_none()) {
return Err(Error::image_msg(
"metadata.json \"backend\":\"kvm\" needs either \"kvm_snapshot\", \
or both \"kvm_kernel\" and \"kvm_initrd\""
.to_owned(),
));
}
Some(KvmImageParts {
kernel,
initrd,
disk,
snapshot,
})
} else {
None
};
Ok(Self {
snapshot_path,
memory_mib,
vcpus,
baker_runtime_sha16,
balloon_target_pages,
tsi_token,
egress_policy,
pre_exec_sync,
layers,
delta_squashfs,
mounts,
volumes,
bundled_kernel,
kvm,
hidden_pool: std::sync::OnceLock::new(),
warm_baked_worker: Arc::new(crate::bake::WarmStash::new(None)),
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
kvm_pool: std::sync::OnceLock::new(),
})
}
pub fn from_oci(image_ref: &str) -> Result<Self, Error> {
Self::from_oci_with_policy(image_ref, PullPolicy::default())
}
pub fn from_oci_with_policy(image_ref: &str, policy: PullPolicy) -> Result<Self, Error> {
let snapshots_dir = default_snapshots_dir();
Self::from_oci_to_dir(image_ref, policy, &snapshots_dir, None)
}
pub fn from_oci_to_dir(
image_ref: &str,
policy: PullPolicy,
snapshots_dir: &Path,
name: Option<&str>,
) -> Result<Self, Error> {
let derived = name
.map(|s| s.to_owned())
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(image_ref));
let snap_dir = snapshots_dir.join(&derived);
let cache_loadable = Self::from_snapshot(&snap_dir).is_ok();
let cache_loadable = if cache_loadable && !matches!(policy, PullPolicy::Never) {
if let Some(baked) = snap_dir_baked_under_other_version(&snap_dir) {
let current = env!("CARGO_PKG_VERSION");
eprintln!(
"supermachine: snapshot at {} was baked under v{baked}; \
current binaries are v{current}. Auto-rebaking.",
snap_dir.display()
);
invalidate_stale_snapshot_tree(&snap_dir);
false
} else {
true
}
} else {
cache_loadable
};
match policy {
PullPolicy::Never => {
if cache_loadable {
return Self::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {image_ref} at {} (PullPolicy::Never)",
snap_dir.display()
)));
}
PullPolicy::Missing if cache_loadable => {
return Self::from_snapshot(&snap_dir);
}
_ => {}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
return Self::bake_kvm_auto(image_ref, &snap_dir);
#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
{
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: image_ref.to_owned(),
name: name.map(|s| s.to_owned()),
runtime: "supermachine".to_owned(),
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.to_path_buf(),
cmd_override: None,
extra_args: Vec::new(),
platform: "linux/arm64".to_owned(),
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root)
.map_err(|e| map_bake_error(&request.image, e))?;
Self::from_snapshot(&snap_dir)
}
}
pub fn builder(image_ref: impl Into<String>) -> OciImageBuilder {
OciImageBuilder::new(image_ref)
}
pub fn ensure_baked<F>(
name: impl Into<String>,
image_ref: impl Into<String>,
configure: F,
) -> Result<Image, Error>
where
F: FnOnce(OciImageBuilder) -> OciImageBuilder,
{
let builder = configure(OciImageBuilder::new(image_ref).with_name(name));
builder.build()
}
pub fn snapshot_path(&self) -> &Path {
&self.snapshot_path
}
pub fn memory_mib(&self) -> u32 {
self.memory_mib
}
pub fn vcpus(&self) -> u32 {
self.vcpus
}
#[doc(hidden)]
pub fn balloon_target_pages_for_test(&self) -> Option<u32> {
self.balloon_target_pages
}
#[cfg(any(
all(target_os = "macos", target_arch = "aarch64"),
all(target_os = "linux", target_arch = "x86_64")
))]
pub fn start(&self, config: &VmConfig) -> Result<Vm, Error> {
Vm::start(self, config)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn bake_kvm(
layer_tars: &[PathBuf],
kernel: &Path,
agent_initrd: &Path,
dest_dir: impl Into<PathBuf>,
) -> Result<Image, Error> {
Self::bake_kvm_with_workload(layer_tars, kernel, agent_initrd, None, dest_dir)
}
pub fn bake_kvm_with_workload(
layer_tars: &[PathBuf],
kernel: &Path,
agent_initrd: &Path,
workload_script: Option<&str>,
dest_dir: impl Into<PathBuf>,
) -> Result<Image, Error> {
let dest = dest_dir.into();
std::fs::create_dir_all(&dest).map_err(Error::Io)?;
let rootfs = dest.join("rootfs.squashfs");
let stage = dest.join("rootfs-stage");
crate::bake::build_kvm_rootfs_squashfs(layer_tars, &stage, &rootfs, workload_script)
.map_err(Error::bake_msg)?;
let _ = std::fs::remove_dir_all(&stage);
let phys = std::fs::metadata(&rootfs).map(|m| m.len()).unwrap_or(0)
+ std::fs::metadata(kernel).map(|m| m.len()).unwrap_or(0)
+ std::fs::metadata(agent_initrd)
.map(|m| m.len())
.unwrap_or(0);
let metadata = serde_json::json!({
"backend": "kvm",
"kvm_kernel": kernel.to_string_lossy(),
"kvm_initrd": agent_initrd.to_string_lossy(),
"kvm_disk": rootfs.to_string_lossy(),
"memory_mib": 512,
"vcpus": 1,
"balloon_target_pages": crate::bake::compute_balloon_target_pages(512),
"baked_at": chrono_rfc3339_now(),
"snapshot_physical_bytes": phys,
});
std::fs::write(
dest.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::bake_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Image::from_snapshot(&dest)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn bake_kvm_from_ref(
image_ref: &str,
kernel: &Path,
agent_initrd: &Path,
dest_dir: impl Into<PathBuf>,
) -> Result<Image, Error> {
let dest = dest_dir.into();
std::fs::create_dir_all(&dest).map_err(Error::Io)?;
let pull_work = dest.join("oci-pull");
let t_pull = std::time::Instant::now();
let (layers, workload) = crate::bake::pull_oci_layers(image_ref, "amd64", &pull_work)
.map_err(Error::bake_msg)?;
let pull_ms = t_pull.elapsed().as_millis();
let t_build = std::time::Instant::now();
let img = Self::bake_kvm_with_workload(
&layers,
kernel,
agent_initrd,
workload.as_deref(),
dest.clone(),
)?;
if crate::trace::enabled("run") {
eprintln!(
"supermachine: kvm bake phases pull_oci_ms={} build_rootfs_ms={}",
pull_ms,
t_build.elapsed().as_millis()
);
}
let meta_path = dest.join("metadata.json");
if let Ok(text) = std::fs::read_to_string(&meta_path) {
if let Ok(mut json) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(obj) = json.as_object_mut() {
obj.insert("image".to_string(), serde_json::json!(image_ref));
if let Ok(pretty) = serde_json::to_string_pretty(&json) {
let _ = std::fs::write(&meta_path, pretty);
}
}
}
}
Ok(img)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn bake_kvm_auto(image_ref: &str, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest = dest_dir.into();
std::fs::create_dir_all(&dest).map_err(Error::Io)?;
let t_assets = std::time::Instant::now();
let (kernel, initrd) = Self::extract_bundled_kvm_boot_assets(&dest)?;
if crate::trace::enabled("run") {
eprintln!(
"supermachine: kvm boot-assets extract_ms={}",
t_assets.elapsed().as_millis()
);
}
Self::bake_kvm_from_ref(image_ref, &kernel, &initrd, dest)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn extract_bundled_kvm_boot_assets(dest: &Path) -> Result<(PathBuf, PathBuf), Error> {
let kernel = dest.join("kernel");
match std::env::var_os("SUPERMACHINE_KVM_KERNEL") {
Some(p) if std::path::Path::new(&p).is_file() => {
std::fs::copy(&p, &kernel).map_err(Error::Io)?;
}
_ => supermachine_kernel::extract_kernel_to(&kernel).map_err(Error::Io)?,
}
let assets = dest.join("agent-assets");
std::fs::create_dir_all(&assets).map_err(Error::Io)?;
let agent_bin = assets.join("supermachine-agent");
let busybox_bin = assets.join("busybox");
supermachine_kernel::extract_supermachine_agent_to(&agent_bin).map_err(Error::Io)?;
supermachine_kernel::extract_busybox_to(&busybox_bin).map_err(Error::Io)?;
let initrd = dest.join("agent.cpio");
crate::bake::build_kvm_agent_initramfs(&agent_bin, &busybox_bin, &[], &initrd)
.map_err(Error::bake_msg)?;
let _ = std::fs::remove_dir_all(&assets);
Ok((kernel, initrd))
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn bake_kvm_from_squashfs(
squashfs: &Path,
kernel: &Path,
agent_initrd: &Path,
dest_dir: impl Into<PathBuf>,
) -> Result<Image, Error> {
let dest = dest_dir.into();
std::fs::create_dir_all(&dest).map_err(Error::Io)?;
let rootfs = dest.join("rootfs.squashfs");
if squashfs != rootfs {
std::fs::copy(squashfs, &rootfs).map_err(Error::Io)?;
}
let metadata = serde_json::json!({
"backend": "kvm",
"kvm_kernel": kernel.to_string_lossy(),
"kvm_initrd": agent_initrd.to_string_lossy(),
"kvm_disk": rootfs.to_string_lossy(),
"memory_mib": 512,
"vcpus": 1,
"balloon_target_pages": crate::bake::compute_balloon_target_pages(512),
});
std::fs::write(
dest.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::bake_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Image::from_snapshot(&dest)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn bake_kvm_from_squashfs_auto(
squashfs: &Path,
dest_dir: impl Into<PathBuf>,
) -> Result<Image, Error> {
let dest = dest_dir.into();
std::fs::create_dir_all(&dest).map_err(Error::Io)?;
let (kernel, initrd) = Self::extract_bundled_kvm_boot_assets(&dest)?;
Self::bake_kvm_from_squashfs(squashfs, &kernel, &initrd, dest)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn build_kvm_initramfs(
agent_bin: &Path,
busybox_bin: &Path,
ordered_modules: &[PathBuf],
out_cpio: &Path,
) -> Result<(), Error> {
crate::bake::build_kvm_agent_initramfs(agent_bin, busybox_bin, ordered_modules, out_cpio)
.map_err(Error::bake_msg)
}
#[cfg(any(
all(target_os = "macos", target_arch = "aarch64"),
all(target_os = "linux", target_arch = "x86_64")
))]
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
self.acquire_with(&VmConfig::new())
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn acquire_with(&self, config: &VmConfig) -> Result<PooledVm<'_>, Error> {
let _span = tracing::info_span!(
"supermachine.acquire",
memory_mib = self.memory_mib,
vcpus = self.vcpus,
)
.entered();
let pool_arc = self.ensure_default_pool(config)?;
let worker = pool_arc.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: Some(Arc::new(ImageMeta {
memory_mib: config.memory_mib.unwrap_or(self.memory_mib),
vcpus: config.vcpus.unwrap_or(self.vcpus),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
})),
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(pool_arc),
_image: std::marker::PhantomData,
})
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn acquire_with(&self, config: &VmConfig) -> Result<PooledVm<'_>, Error> {
if self.kvm_pool.get().is_none() {
if let Ok(warm) = warm_snapshot_for_pool(self, config) {
let pol = PoolPolicy::default();
let inner = Arc::new(KvmPoolInner {
image: warm,
vm_config: config.clone(),
min: 0,
idle: std::sync::Mutex::new(Vec::new()),
admission: Admission::new(pol.max, pol.acquire_timeout),
restore_on_release: pol.restore_on_release,
refilling: std::sync::atomic::AtomicBool::new(false),
shut_down: std::sync::atomic::AtomicBool::new(false),
});
let _ = self.kvm_pool.set(Arc::new(Pool { kvm: inner }));
}
}
if let Some(pool) = self.kvm_pool.get() {
return pool.acquire();
}
let vm = Vm::start(self, config)?;
Ok(PooledVm {
vm: Some(vm),
pool: None,
_image: std::marker::PhantomData,
})
}
pub fn pool(&self) -> PoolBuilder<'_> {
PoolBuilder {
image: self,
policy: PoolPolicy::default(),
vm_config: VmConfig::new(),
}
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn build_pool_arc(
&self,
config: &VmConfig,
policy: PoolPolicy,
) -> Result<Arc<HiddenPool>, Error> {
sweep_volume_temps_once();
#[cfg(target_os = "macos")]
let worker_bin = {
if let Some(p) = std::env::var_os("SUPERMACHINE_WORKER_BIN") {
let p = PathBuf::from(p);
if !p.is_file() {
return Err(Error::assets_msg(format!(
"SUPERMACHINE_WORKER_BIN={} does not exist or is not a file",
p.display()
)));
}
p
} else {
let source =
if let Some(bundled) = std::env::var_os("SUPERMACHINE_WORKER_BIN_BUNDLED") {
let p = PathBuf::from(bundled);
if !p.is_file() {
return Err(Error::assets_msg(format!(
"SUPERMACHINE_WORKER_BIN_BUNDLED={} does not exist or is not a file",
p.display()
)));
}
p
} else {
crate::codesign::locate_worker_bin().ok_or_else(|| {
Error::assets_msg(
"supermachine-worker binary not found (looked for sibling of \
current_exe and target/release/supermachine-worker). Set \
SUPERMACHINE_WORKER_BIN if you have it elsewhere."
.to_owned(),
)
})?
};
crate::assets::ensure_worker_in_user_dir(&source).map_err(Error::assets_msg)?
}
};
#[cfg(not(target_os = "macos"))]
let worker_bin: PathBuf = std::env::var_os("SUPERMACHINE_WORKER_BIN")
.map(PathBuf::from)
.ok_or_else(|| {
Error::assets_msg("SUPERMACHINE_WORKER_BIN must be set on this platform".to_owned())
})?;
#[cfg(target_os = "macos")]
{
crate::codesign::verify_worker_version(&worker_bin).map_err(Error::vm_msg)?;
if let Err(e) = crate::codesign::ensure_worker_signed(&worker_bin) {
return Err(Error::vm_msg(format!(
"supermachine-worker codesign failed: {e}"
)));
}
}
let socks_dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => PathBuf::from(format!(
"/tmp/supermachine-pool-{}-{:x}",
std::process::id(),
unique_suffix(),
)),
};
std::fs::create_dir_all(&socks_dir).map_err(Error::Io)?;
if let Some(home) = std::env::var_os("HOME") {
let cache_dir = PathBuf::from(home)
.join(".cache")
.join("supermachine")
.join("rosetta-aot");
if cache_dir.is_dir() {
let _ = scrub_aotcache_orphans(&cache_dir);
}
}
let memory_mib = config.memory_mib.unwrap_or(self.memory_mib);
let vcpus = config.vcpus.unwrap_or(self.vcpus);
let spawn_timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(30));
let name_prefix = "w".to_owned();
let spawn_cfg = Arc::new(SpawnConfig {
observed_footprint_mib: std::sync::atomic::AtomicU64::new(0),
worker_bin,
snapshot_path: self.snapshot_path.clone(),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
mounts: self.mounts.clone(),
volumes: self.volumes.clone(),
memory_mib,
vcpus,
socks_dir: socks_dir.clone(),
name_prefix,
spawn_timeout,
baker_runtime_sha16: self.baker_runtime_sha16.clone(),
balloon_target_pages: self.balloon_target_pages,
tsi_token: self.tsi_token.clone(),
egress_policy: self.egress_policy.clone(),
pre_exec_sync: self.pre_exec_sync,
restore_on_release: policy.restore_on_release,
});
let claimed_warm = self.warm_baked_worker.take().and_then(|mut bw| {
match bw.child.try_wait() {
Ok(None) => Some(bw), Ok(Some(_)) => None, Err(_) => None, }
});
let initial = policy.min;
let mut idle: Vec<IdleEntry> = Vec::with_capacity(initial.max(1));
let extra_warm: usize;
if let Some(bw) = claimed_warm {
let warm_use_once = policy.restore_on_release
&& spawn_cfg.volumes.iter().any(|(_, _, _, p)| p.is_some());
let worker = warm_baked_to_worker(
bw,
spawn_cfg.memory_mib,
!spawn_cfg.mounts.is_empty() || !spawn_cfg.volumes.is_empty(),
spawn_cfg
.volumes
.iter()
.map(|(_, gp, _, _)| gp.clone())
.collect(),
spawn_cfg.vcpus,
warm_use_once,
);
if !spawn_cfg.mounts.is_empty() || !spawn_cfg.volumes.is_empty() {
let _ = drop_vfs_caches_via_agent(&worker.vsock_exec_path, spawn_cfg.memory_mib);
}
let _ = sync_time_via_agent(&worker.vsock_exec_path);
idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
extra_warm = 1;
} else {
extra_warm = 0;
}
let to_spawn = initial.saturating_sub(extra_warm);
if to_spawn == 1 {
idle.push(IdleEntry {
worker: spawn_cfg.spawn_one()?,
last_used: Instant::now(),
});
} else if to_spawn > 1 {
let mut handles = Vec::with_capacity(to_spawn);
for _ in 0..to_spawn {
let cfg = Arc::clone(&spawn_cfg);
handles.push(std::thread::spawn(move || cfg.spawn_one()));
}
for h in handles {
let w = h
.join()
.map_err(|_| Error::vm_msg("pool spawn thread panicked".to_owned()))??;
idle.push(IdleEntry {
worker: w,
last_used: Instant::now(),
});
}
}
let initial = idle.len();
let pool = Arc::new(HiddenPool {
state: Arc::new(Mutex::new(PoolState {
idle,
alive: initial,
waiting: 0,
reaped: 0,
})),
available: Arc::new(Condvar::new()),
dirty: Some(Arc::new(Mutex::new(VecDeque::new()))),
dirty_pending: Some(Arc::new(Condvar::new())),
socks_dir,
shutting_down: Arc::new(AtomicBool::new(false)),
spawn_cfg: Arc::clone(&spawn_cfg),
policy,
});
let wait_handles = pool.wait_handles();
let h_replenish = {
let h = wait_handles.clone();
std::thread::Builder::new()
.name("supermachine-pool-replenish".into())
.spawn(move || replenisher_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn replenisher thread: {e}")))?
};
let mut handles = vec![h_replenish];
if policy.restore_on_release {
let restorer_count = ((policy.max + 1) / 2).clamp(1, 4).min(policy.max.max(1));
for _ in 0..restorer_count {
let h = wait_handles.clone();
let h_restore = std::thread::Builder::new()
.name("supermachine-pool-restore".into())
.spawn(move || restorer_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn restorer thread: {e}")))?;
handles.push(h_restore);
}
}
if pool.policy.idle_timeout != Duration::MAX {
let h = wait_handles.clone();
let h_janitor = std::thread::Builder::new()
.name("supermachine-pool-janitor".into())
.spawn(move || janitor_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn janitor thread: {e}")))?;
handles.push(h_janitor);
}
drop(handles);
Ok(pool)
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn ensure_default_pool(&self, config: &VmConfig) -> Result<&Arc<HiddenPool>, Error> {
if let Some(p) = self.hidden_pool.get() {
return Ok(p);
}
let pool = self.build_pool_arc(config, PoolPolicy::default())?;
let _ = self.hidden_pool.set(pool);
Ok(self
.hidden_pool
.get()
.expect("hidden pool just initialized"))
}
}
#[derive(Clone)]
struct PoolWaitHandles {
state: Arc<Mutex<PoolState>>,
available: Arc<Condvar>,
dirty: Option<Arc<Mutex<VecDeque<Worker>>>>,
dirty_pending: Option<Arc<Condvar>>,
shutting_down: Arc<AtomicBool>,
spawn_cfg: Arc<SpawnConfig>,
policy: PoolPolicy,
}
impl HiddenPool {
fn wait_handles(&self) -> PoolWaitHandles {
PoolWaitHandles {
state: Arc::clone(&self.state),
available: Arc::clone(&self.available),
dirty: self.dirty.as_ref().map(Arc::clone),
dirty_pending: self.dirty_pending.as_ref().map(Arc::clone),
shutting_down: Arc::clone(&self.shutting_down),
spawn_cfg: Arc::clone(&self.spawn_cfg),
policy: self.policy,
}
}
}
fn restorer_loop(h: PoolWaitHandles) {
let (Some(dirty), Some(pending)) = (h.dirty.as_ref(), h.dirty_pending.as_ref()) else {
return;
};
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let mut worker = {
let mut q = match dirty.lock() {
Ok(q) => q,
Err(_) => return,
};
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
if let Some(w) = q.pop_front() {
break w;
}
q = match pending.wait_timeout(q, Duration::from_millis(100)) {
Ok((g, _)) => g,
Err(_) => return,
};
}
};
let snap_path = h.spawn_cfg.snapshot_path.clone();
match worker.send_restore(&snap_path) {
Ok(()) => {
if let Ok(mut s) = h.state.lock() {
s.idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
h.available.notify_all();
}
}
Err(_) => {
worker.shutdown();
if let Ok(mut s) = h.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
h.available.notify_all();
}
}
}
}
fn replenisher_loop(h: PoolWaitHandles) {
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let need_more = {
let s = match h.state.lock() {
Ok(s) => s,
Err(_) => return,
};
s.alive < h.policy.min
};
if !need_more {
let s = match h.state.lock() {
Ok(s) => s,
Err(_) => return,
};
let _ = h.available.wait_timeout(s, Duration::from_millis(100));
continue;
}
if let Ok(mut s) = h.state.lock() {
if s.alive >= h.policy.min {
continue;
}
s.alive += 1;
}
match h.spawn_cfg.spawn_one() {
Ok(w) => {
if let Ok(mut s) = h.state.lock() {
s.idle.push(IdleEntry {
worker: w,
last_used: Instant::now(),
});
h.available.notify_all();
}
}
Err(_) => {
if let Ok(mut s) = h.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
std::thread::sleep(Duration::from_millis(500));
}
}
}
}
fn janitor_loop(h: PoolWaitHandles) {
let timeout = h.policy.idle_timeout;
let min = h.policy.min;
if timeout == Duration::MAX {
return;
}
let tick = (timeout / 4).max(Duration::from_millis(100));
let wait_unit = Duration::from_millis(100).min(tick);
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let mut to_evict: Vec<Worker> = Vec::new();
let mut reaped_now = 0u64;
if let Ok(mut s) = h.state.lock() {
let now = Instant::now();
let mut i = 0;
while i < s.idle.len() {
let dead = match s.idle[i].worker.child.try_wait() {
Ok(Some(_)) => true, Ok(None) => false, Err(_) => true, };
if dead {
let entry = s.idle.remove(i);
s.alive = s.alive.saturating_sub(1);
s.reaped += 1;
reaped_now += 1;
to_evict.push(entry.worker);
} else {
i += 1;
}
}
while s.alive > min && !s.idle.is_empty() {
let oldest = &s.idle[0];
if now.duration_since(oldest.last_used) < timeout {
break;
}
let entry = s.idle.remove(0);
s.alive -= 1;
to_evict.push(entry.worker);
}
}
if reaped_now > 0 {
h.available.notify_all();
eprintln!(
"[pool-watchdog] reaped {reaped_now} dead idle worker(s) — pool will respawn \
toward min"
);
}
for mut w in to_evict {
w.shutdown();
}
let mut remaining = tick;
while remaining > Duration::ZERO && !h.shutting_down.load(Ordering::SeqCst) {
let chunk = remaining.min(wait_unit);
if let Ok(s) = h.state.lock() {
let _ = h.available.wait_timeout(s, chunk);
}
remaining = remaining.saturating_sub(chunk);
}
}
}
pub struct PooledVm<'a> {
vm: Option<Vm>,
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
worker: Option<Worker>,
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pool_arc: Arc<HiddenPool>,
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pool: Option<Arc<KvmPoolInner>>,
_image: std::marker::PhantomData<&'a Image>,
}
impl std::ops::Deref for PooledVm<'_> {
type Target = Vm;
fn deref(&self) -> &Vm {
self.vm.as_ref().expect("PooledVm used after drop")
}
}
impl std::ops::DerefMut for PooledVm<'_> {
fn deref_mut(&mut self) -> &mut Vm {
self.vm.as_mut().expect("PooledVm used after drop")
}
}
impl PooledVm<'_> {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn snapshot(&self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
self.vm
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no vm (already dropped?)".to_owned()))?
.snapshot_live(dest_dir)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn snapshot_diff(
&self,
dest_dir: impl Into<PathBuf>,
base_path: &Path,
) -> Result<Image, Error> {
self.vm
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no vm (already dropped?)".to_owned()))?
.snapshot_diff_live(dest_dir, base_path)
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn snapshot(&self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
self.snapshot_with_opt_base(dest_dir, None)
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn snapshot_diff(
&self,
dest_dir: impl Into<PathBuf>,
base_path: &Path,
) -> Result<Image, Error> {
self.snapshot_with_opt_base(dest_dir, Some(base_path))
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn snapshot_with_opt_base(
&self,
dest_dir: impl Into<PathBuf>,
base_path: Option<&Path>,
) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let _span = tracing::info_span!(
"supermachine.snapshot",
dest_dir = %dest_dir.display(),
)
.entered();
let worker = self
.worker
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no worker (already dropped?)".to_owned()))?;
let vm = self
.vm
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no vm (already dropped?)".to_owned()))?;
let meta = vm.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"PooledVm::snapshot: image metadata missing (acquire from a 0.3.8+ Image)"
.to_owned(),
)
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let parked = if meta.vcpus > 1 {
worker.send_smpark_park()?
} else {
false
};
let snap_result = match base_path {
Some(base) => worker.send_snapshot_with_base(&snap_path, base, true),
None => worker.send_snapshot(&snap_path),
};
if parked {
let _ = worker.send_smpark_unpark()?;
}
let _stats = snap_result?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "PooledVm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Image::from_snapshot(&dest_dir)
}
}
impl Drop for PooledVm<'_> {
fn drop(&mut self) {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
{
let _ = self.vm.take();
if let Some(worker) = self.worker.take() {
self.pool_arc.release(worker);
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
if let Some(pool) = self.pool.take() {
let Some(vm) = self.vm.take() else {
return;
};
let warm_slot = {
let idle = pool.idle.lock().unwrap_or_else(|e| e.into_inner());
idle.len() < pool.min
};
if !warm_slot {
drop(vm);
pool.admission.release();
return;
}
if pool.restore_on_release {
if let Err(e) = vm.reset_to_snapshot() {
eprintln!(
"supermachine: KVM in-place reset failed ({e}); \
tearing down this VM instead of reusing it"
);
drop(vm);
pool.admission.release();
pool.refill_async();
return;
}
}
pool.idle.lock().unwrap_or_else(|e| e.into_inner()).push(vm);
pool.admission.notify(); } else {
let _ = self.vm.take();
}
}
}
pub struct PoolBuilder<'a> {
image: &'a Image,
policy: PoolPolicy,
vm_config: VmConfig,
}
impl PoolBuilder<'_> {
pub fn min(mut self, n: usize) -> Self {
self.policy.min = n;
self
}
pub fn max(mut self, n: usize) -> Self {
self.policy.max = n.max(1);
self
}
pub fn idle_timeout(mut self, d: Duration) -> Self {
self.policy.idle_timeout = d;
self
}
pub fn acquire_timeout(mut self, d: Duration) -> Self {
self.policy.acquire_timeout = Some(d);
self
}
pub fn no_acquire_timeout(mut self) -> Self {
self.policy.acquire_timeout = None;
self
}
pub fn restore_on_release(mut self, on: bool) -> Self {
self.policy.restore_on_release = on;
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.vm_config = std::mem::take(&mut self.vm_config).with_restore_timeout(timeout);
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.vm_config = std::mem::take(&mut self.vm_config).with_memory_mib(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vm_config = std::mem::take(&mut self.vm_config).with_vcpus(vcpus);
self
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn build(self) -> Result<Pool, Error> {
let image = self.image;
let mut policy = self.policy;
if policy.min > policy.max {
policy.min = policy.max;
}
let arc = image.build_pool_arc(&self.vm_config, policy)?;
match image.hidden_pool.set(Arc::clone(&arc)) {
Ok(()) => Ok(Pool { inner: arc }),
Err(_) => Err(Error::vm_msg(
"this Image already has a Pool — pool() can only be called \
once per Image handle.\n\n\
To run two independent pools off the same snapshot, load the \
Image twice via Image.fromSnapshot:\n\n \
const a = await Image.fromSnapshot(snapshotPath);\n \
const b = await Image.fromSnapshot(snapshotPath);\n \
const poolA = await a.pool(...);\n \
const poolB = await b.pool(...);\n\n\
If you called image.acquire() before image.pool(), the \
Image has been bound to a default pool whose policy can't \
be changed in place — either reorder the calls or use \
fromSnapshot() to get a fresh handle."
.to_owned(),
)),
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn build(self) -> Result<Pool, Error> {
if self.image.kvm_pool.get().is_some() {
return Err(Error::vm_msg(
"this Image already has a default Pool — image.acquire() was \
called before pool().build(), binding a default pool whose \
policy can't be changed in place. Reorder so pool().build() \
runs first, or load a fresh handle via Image::from_snapshot."
.to_owned(),
));
}
let warm = warm_snapshot_for_pool(self.image, &self.vm_config)?;
let min = self.policy.min.min(self.policy.max.max(1));
let inner = Arc::new(KvmPoolInner {
image: warm,
vm_config: self.vm_config,
min,
idle: std::sync::Mutex::new(Vec::new()),
admission: Admission::new(self.policy.max, self.policy.acquire_timeout),
restore_on_release: self.policy.restore_on_release,
refilling: std::sync::atomic::AtomicBool::new(false),
shut_down: std::sync::atomic::AtomicBool::new(false),
});
inner.prewarm_to_min();
Ok(Pool { kvm: inner })
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn prepare_kvm_volumes(
specs: &[crate::vmm::resources::VolumeSpec],
) -> Result<Vec<crate::kvm::run::VolumeAttach>, Error> {
let mut out = Vec::with_capacity(specs.len());
for spec in specs {
let path = std::path::Path::new(&spec.host_path);
let fresh = !path.exists();
if fresh {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(Error::Io)?;
}
let f = std::fs::File::create(path).map_err(Error::Io)?;
f.set_len(spec.size_bytes).map_err(Error::Io)?;
drop(f);
let status = std::process::Command::new("mke2fs")
.args(["-t", "ext4", "-F", "-q", "-E", "lazy_itable_init=1"])
.arg(path)
.status();
match status {
Ok(s) if s.success() => {}
Ok(s) => {
let _ = std::fs::remove_file(path);
return Err(Error::vm_msg(format!(
"mke2fs on volume {} exited with {s} (install e2fsprogs)",
spec.host_path
)));
}
Err(e) => {
let _ = std::fs::remove_file(path);
return Err(Error::vm_msg(format!(
"failed to run mke2fs for volume {}: {e} (install e2fsprogs)",
spec.host_path
)));
}
}
}
out.push(crate::kvm::run::VolumeAttach {
path: spec.host_path.clone(),
size: spec.size_bytes,
mount: spec.guest_path.clone(),
});
}
Ok(out)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn persist_kvm_builder_volumes(
snap_dir: &Path,
volumes: &[(PathBuf, String, u64)],
) -> Result<(), Error> {
let meta_path = snap_dir.join("metadata.json");
let text = std::fs::read_to_string(&meta_path).map_err(Error::Io)?;
let mut meta: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| Error::vm_msg(format!("parse {}: {e}", meta_path.display())))?;
let arr: Vec<serde_json::Value> = volumes
.iter()
.map(|(host, guest, size)| {
serde_json::json!({
"host_file": host.to_string_lossy(),
"guest_path": guest,
"size_bytes": size,
})
})
.collect();
if let Some(obj) = meta.as_object_mut() {
obj.insert("volumes".into(), serde_json::json!(arr));
}
std::fs::write(
&meta_path,
serde_json::to_string_pretty(&meta)
.map_err(|e| Error::vm_msg(format!("serialize metadata: {e}")))?,
)
.map_err(Error::Io)?;
Ok(())
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn write_kvm_snapshot_metadata(
dest_dir: &Path,
snap_rel: &str,
base: Option<&Path>,
mem_size: usize,
num_cpus: u8,
) -> Result<(), Error> {
let mut meta = serde_json::json!({
"backend": "kvm",
"kvm_snapshot": snap_rel,
"memory_mib": (mem_size >> 20) as u32,
"vcpus": num_cpus as u32,
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot_live",
});
if let Some(b) = base {
meta["kvm_snapshot_base"] = serde_json::json!(b.to_string_lossy());
meta["source"] = serde_json::json!("Vm::snapshot_diff_live");
}
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&meta)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Ok(())
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn warm_snapshot_for_pool(image: &Image, config: &VmConfig) -> Result<Image, Error> {
if image
.kvm
.as_ref()
.and_then(|k| k.snapshot.as_ref())
.is_some()
{
return Ok(image.clone());
}
let trace = crate::trace::enabled("pool");
let mem = config.memory_mib.unwrap_or(image.memory_mib);
let vcpus = config.vcpus.unwrap_or(image.vcpus);
let snapshots_dir = default_snapshots_dir();
let family = warm_family_key(image);
let worker_fp = warm_worker_fingerprint();
let warm_name = format!("{family}__kvmwarm__{mem}m{vcpus}v__{worker_fp}");
let warm_dir = snapshots_dir.join(&warm_name);
if let Ok(img) = Image::from_snapshot(&warm_dir) {
if trace {
eprintln!("[warm-pool] cache hit: {}", warm_dir.display());
}
return Ok(img);
}
let _lock = WarmDirLock::acquire(&snapshots_dir, &warm_name);
if let Ok(img) = Image::from_snapshot(&warm_dir) {
if trace {
eprintln!("[warm-pool] cache hit (post-lock): {}", warm_dir.display());
}
return Ok(img);
}
if trace {
eprintln!(
"[warm-pool] cold warm: boot + warmup -> {}",
warm_dir.display()
);
}
let vm = Vm::start(image, config)?;
std::thread::sleep(std::time::Duration::from_millis(3500));
let partial = snapshots_dir.join(format!("{warm_name}.partial.{}", std::process::id()));
let _ = std::fs::remove_dir_all(&partial);
let _ = vm.snapshot(&partial)?;
finalize_warm_snapshot_metadata(&partial, &family, mem, &worker_fp)?;
dedup_warm_against_golden(&snapshots_dir, &partial, mem, vcpus, &worker_fp, trace);
match std::fs::rename(&partial, &warm_dir) {
Ok(()) => {}
Err(_) if warm_dir.is_dir() => {
let _ = std::fs::remove_dir_all(&partial);
}
Err(e) => {
let _ = std::fs::remove_dir_all(&partial);
return Err(Error::Io(e));
}
}
prune_stale_warm_artifacts(&snapshots_dir, &worker_fp);
Image::from_snapshot(&warm_dir)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn prune_stale_warm_artifacts(snapshots_dir: &Path, current_fp: &str) {
let grace = std::time::Duration::from_secs(60);
let now = std::time::SystemTime::now();
let old_enough = |p: &Path| -> bool {
std::fs::metadata(p)
.and_then(|m| m.modified())
.map(|mt| now.duration_since(mt).map(|d| d >= grace).unwrap_or(false))
.unwrap_or(false)
};
if let Ok(rd) = std::fs::read_dir(snapshots_dir) {
for entry in rd.flatten() {
let p = entry.path();
let Some(name) = p.file_name().and_then(|s| s.to_str()) else {
continue;
};
if name.contains("__kvmwarm__")
&& name.rsplit("__").next() != Some(current_fp)
&& p.is_dir()
&& old_enough(&p)
{
let _ = std::fs::remove_dir_all(&p);
}
}
}
let golden_dir = snapshots_dir.join(".kvmgolden");
if let Ok(rd) = std::fs::read_dir(&golden_dir) {
for entry in rd.flatten() {
let p = entry.path();
if p.extension().and_then(|s| s.to_str()) != Some("snap") {
continue;
}
let stale = p
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.rsplit("__").next())
.map(|fp| fp != current_fp)
.unwrap_or(false);
if stale && old_enough(&p) {
let _ = std::fs::remove_file(&p);
}
}
}
}
#[cfg(all(test, target_os = "linux", target_arch = "x86_64"))]
mod warm_gc_tests {
use super::*;
fn set_mtime_secs_ago(p: &Path, secs_ago: i64) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let tv = libc::timeval {
tv_sec: now - secs_ago,
tv_usec: 0,
};
let times = [tv, tv];
let c = std::ffi::CString::new(p.to_str().unwrap()).unwrap();
unsafe {
libc::utimes(c.as_ptr(), times.as_ptr());
}
}
#[test]
fn prune_removes_stale_fp_keeps_current_and_fresh() {
let root = std::env::temp_dir().join(format!("sm-gc-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
let golden_dir = root.join(".kvmgolden");
std::fs::create_dir_all(&golden_dir).unwrap();
let cur = "aaaa11112222";
let old = "bbbb33334444";
let warm_cur = root.join(format!("nginx__kvmwarm__512m1v__{cur}"));
let warm_old = root.join(format!("nginx__kvmwarm__512m1v__{old}"));
let warm_old_fresh = root.join(format!("alpine__kvmwarm__512m1v__{old}"));
for d in [&warm_cur, &warm_old, &warm_old_fresh] {
std::fs::create_dir_all(d).unwrap();
std::fs::write(d.join("vm.snap"), b"x").unwrap();
}
let gold_cur = golden_dir.join(format!("512m1v__{cur}.snap"));
let gold_old = golden_dir.join(format!("512m1v__{old}.snap"));
std::fs::write(&gold_cur, b"x").unwrap();
std::fs::write(&gold_old, b"x").unwrap();
for p in [&warm_cur, &warm_old, &gold_cur, &gold_old] {
set_mtime_secs_ago(p, 120);
}
set_mtime_secs_ago(&warm_old_fresh, 1);
prune_stale_warm_artifacts(&root, cur);
assert!(warm_cur.is_dir(), "current-fp warm dir must be kept");
assert!(
!warm_old.exists(),
"stale-fp warm dir (aged) must be pruned"
);
assert!(
warm_old_fresh.is_dir(),
"stale-fp warm dir within grace must be kept"
);
assert!(gold_cur.exists(), "current-fp golden must be kept");
assert!(!gold_old.exists(), "stale-fp golden (aged) must be pruned");
let _ = std::fs::remove_dir_all(&root);
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn warm_family_key(image: &Image) -> String {
let dir = image.snapshot_path.parent().unwrap_or(&image.snapshot_path);
let raw = dir.file_name().and_then(|s| s.to_str()).unwrap_or("");
let sanitized: String = raw
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
c
} else {
'_'
}
})
.collect();
if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
image.snapshot_path.hash(&mut h);
format!("img-{:016x}", h.finish())
} else {
sanitized
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn warm_worker_fingerprint() -> String {
std::env::var_os("SUPERMACHINE_WORKER_BIN")
.map(PathBuf::from)
.and_then(|p| current_worker_sha16(&p))
.map(|sha| sha.chars().take(12).collect::<String>())
.unwrap_or_else(|| "noworker".to_owned())
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn finalize_warm_snapshot_metadata(
partial: &Path,
family: &str,
mem: u32,
worker_fp: &str,
) -> Result<(), Error> {
let meta_path = partial.join("metadata.json");
let text = std::fs::read_to_string(&meta_path).map_err(Error::Io)?;
let mut meta: serde_json::Value =
serde_json::from_str(&text).map_err(|e| Error::vm_msg(format!("warm metadata: {e}")))?;
let obj = meta
.as_object_mut()
.ok_or_else(|| Error::vm_msg("warm metadata is not an object".to_owned()))?;
obj.insert("kvm_snapshot".into(), serde_json::json!("vm.snap"));
obj.insert("image".into(), serde_json::json!(family));
obj.insert("memory_mib".into(), serde_json::json!(mem));
obj.insert(
"baked_by_version".into(),
serde_json::json!(env!("CARGO_PKG_VERSION")),
);
obj.insert("warm_worker_fp".into(), serde_json::json!(worker_fp));
obj.insert("source".into(), serde_json::json!("warm_snapshot_for_pool"));
std::fs::write(
&meta_path,
serde_json::to_string_pretty(&meta)
.map_err(|e| Error::vm_msg(format!("warm metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Ok(())
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn dedup_warm_against_golden(
snapshots_dir: &Path,
partial: &Path,
mem: u32,
vcpus: u32,
worker_fp: &str,
trace: bool,
) {
if std::env::var("SUPERMACHINE_AUTO_DEDUP").as_deref() == Ok("0") {
return;
}
let warm_snap = partial.join("vm.snap");
let golden_dir = snapshots_dir.join(".kvmgolden");
if std::fs::create_dir_all(&golden_dir).is_err() {
return;
}
let golden = golden_dir.join(format!("{mem}m{vcpus}v__{worker_fp}.snap"));
let _lock = WarmDirLock::acquire(&golden_dir, &format!("{mem}m{vcpus}v__{worker_fp}"));
if !golden.exists() {
if std::fs::hard_link(&warm_snap, &golden).is_err() {
let tmp = golden.with_extension("snap.tmp");
if std::fs::copy(&warm_snap, &tmp).is_err() || std::fs::rename(&tmp, &golden).is_err() {
let _ = std::fs::remove_file(&tmp);
return;
}
}
}
match crate::kvm::run::rewrite_full_as_diff(&warm_snap, &golden) {
Ok(diff_size) => {
let meta_path = partial.join("metadata.json");
if let Ok(text) = std::fs::read_to_string(&meta_path) {
if let Ok(mut meta) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(obj) = meta.as_object_mut() {
obj.insert(
"kvm_snapshot_base".into(),
serde_json::json!(golden.to_string_lossy()),
);
if let Ok(s) = serde_json::to_string_pretty(&meta) {
let _ = std::fs::write(&meta_path, s);
}
}
}
}
if trace {
eprintln!(
"[warm-pool] deduped vs golden {}: diff {:.1} MiB",
golden.display(),
diff_size as f64 / 1048576.0
);
}
}
Err(e) => {
if trace {
eprintln!("[warm-pool] dedup skipped ({e}); warm snapshot kept full");
}
}
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
struct WarmDirLock {
_file: Option<std::fs::File>,
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
impl WarmDirLock {
fn acquire(dir: &Path, name: &str) -> Self {
use std::os::fd::AsRawFd;
if std::fs::create_dir_all(dir).is_err() {
return Self { _file: None };
}
let lock_path = dir.join(format!(".{name}.warmlock"));
let Ok(file) = std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&lock_path)
else {
return Self { _file: None };
};
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(120);
loop {
let r = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if r == 0 {
return Self { _file: Some(file) };
}
if std::time::Instant::now() > deadline {
return Self { _file: None };
}
std::thread::sleep(std::time::Duration::from_millis(25));
}
}
}
#[derive(Clone)]
pub struct Pool {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
inner: Arc<HiddenPool>,
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
kvm: Arc<KvmPoolInner>,
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
struct KvmPoolInner {
image: Image,
vm_config: VmConfig,
min: usize,
idle: Mutex<Vec<Vm>>,
admission: Admission,
restore_on_release: bool,
refilling: std::sync::atomic::AtomicBool,
shut_down: std::sync::atomic::AtomicBool,
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
struct Admission {
max: usize,
alive: std::sync::atomic::AtomicUsize,
lock: std::sync::Mutex<()>,
available: std::sync::Condvar,
acquire_timeout: Option<Duration>,
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
impl Admission {
fn new(max: usize, acquire_timeout: Option<Duration>) -> Self {
Self {
max: max.max(1),
alive: std::sync::atomic::AtomicUsize::new(0),
lock: std::sync::Mutex::new(()),
available: std::sync::Condvar::new(),
acquire_timeout,
}
}
fn alive(&self) -> usize {
self.alive.load(std::sync::atomic::Ordering::SeqCst)
}
fn try_reserve(&self) -> bool {
use std::sync::atomic::Ordering::SeqCst;
loop {
let cur = self.alive.load(SeqCst);
if cur >= self.max {
return false;
}
if self
.alive
.compare_exchange(cur, cur + 1, SeqCst, SeqCst)
.is_ok()
{
return true;
}
}
}
fn wait_for_change(&self, deadline: Option<std::time::Instant>) -> Result<bool, Error> {
let g = self.lock.lock().unwrap_or_else(|e| e.into_inner());
let wait = match deadline {
Some(dl) => {
let now = std::time::Instant::now();
if now >= dl {
return Ok(false);
}
(dl - now).min(Duration::from_millis(100))
}
None => Duration::from_millis(100),
};
let _ = self.available.wait_timeout(g, wait);
Ok(true)
}
fn release(&self) {
self.alive.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
self.available.notify_one();
}
fn notify(&self) {
self.available.notify_all();
}
}
#[cfg(all(test, target_os = "linux", target_arch = "x86_64"))]
mod admission_tests {
use super::Admission;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[test]
fn try_reserve_bounds_at_max() {
let a = Admission::new(3, None);
assert!(a.try_reserve());
assert!(a.try_reserve());
assert!(a.try_reserve());
assert_eq!(a.alive(), 3);
assert!(!a.try_reserve());
assert_eq!(a.alive(), 3);
a.release();
assert_eq!(a.alive(), 2);
assert!(a.try_reserve());
assert_eq!(a.alive(), 3);
}
#[test]
fn max_zero_clamps_to_one() {
let a = Admission::new(0, None);
assert_eq!(a.max, 1);
assert!(a.try_reserve());
assert!(!a.try_reserve());
}
#[test]
fn wait_for_change_reports_deadline_passed() {
let a = Admission::new(1, Some(Duration::from_millis(50)));
assert!(a.try_reserve());
let past = Instant::now() - Duration::from_secs(1);
let start = Instant::now();
assert_eq!(a.wait_for_change(Some(past)).unwrap(), false);
assert!(
start.elapsed() < Duration::from_millis(50),
"a passed deadline must not block"
);
assert_eq!(a.alive(), 1);
}
#[test]
fn wait_for_change_wakes_on_notify() {
let a = Arc::new(Admission::new(1, Some(Duration::from_secs(5))));
assert!(a.try_reserve());
let peer = Arc::clone(&a);
let waker = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(20));
peer.notify();
});
let start = Instant::now();
let deadline = Some(Instant::now() + Duration::from_secs(5));
assert_eq!(a.wait_for_change(deadline).unwrap(), true);
let elapsed = start.elapsed();
waker.join().unwrap();
assert!(
elapsed < Duration::from_millis(100),
"notify must wake the waiter promptly, waited {elapsed:?}"
);
}
#[test]
fn wait_for_change_caps_each_wait_without_notify() {
let a = Admission::new(1, Some(Duration::from_secs(10)));
assert!(a.try_reserve());
let start = Instant::now();
let deadline = Some(Instant::now() + Duration::from_secs(10));
assert_eq!(a.wait_for_change(deadline).unwrap(), true);
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(250),
"each wait must cap at ~100ms, waited {elapsed:?}"
);
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
impl KvmPoolInner {
fn restore_one(&self) -> Result<Vm, Error> {
Vm::start(&self.image, &self.vm_config)
}
fn take_or_restore(&self) -> Result<Vm, Error> {
let deadline = self
.admission
.acquire_timeout
.map(|d| std::time::Instant::now() + d);
loop {
if self.shut_down.load(std::sync::atomic::Ordering::SeqCst) {
return Err(Error::pool_exhausted("pool is shut down"));
}
if let Some(vm) = self.idle.lock().unwrap_or_else(|e| e.into_inner()).pop() {
return Ok(vm);
}
if self.admission.try_reserve() {
return self.restore_one().inspect_err(|_| self.admission.release());
}
if !self.admission.wait_for_change(deadline)? {
return Err(Error::pool_exhausted(format!(
"pool saturated: all {} VM slots in use and none freed within \
acquire_timeout",
self.admission.max
)));
}
}
}
fn prewarm_to_min(&self) {
loop {
let idle_len = self.idle.lock().unwrap_or_else(|e| e.into_inner()).len();
if idle_len >= self.min || !self.admission.try_reserve() {
break;
}
match self.restore_one() {
Ok(vm) => {
self.idle.lock().unwrap_or_else(|e| e.into_inner()).push(vm);
self.admission.notify();
}
Err(_) => {
self.admission.release();
break;
}
}
}
}
fn refill_async(self: &Arc<Self>) {
use std::sync::atomic::Ordering::SeqCst;
if self.min == 0 {
return;
}
if self
.refilling
.compare_exchange(false, true, SeqCst, SeqCst)
.is_err()
{
return; }
let inner = Arc::clone(self);
std::thread::spawn(move || {
loop {
let idle_len = inner.idle.lock().unwrap_or_else(|e| e.into_inner()).len();
if idle_len >= inner.min || !inner.admission.try_reserve() {
break;
}
match inner.restore_one() {
Ok(vm) => {
inner
.idle
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(vm);
inner.admission.notify();
}
Err(_) => {
inner.admission.release();
break;
}
}
}
inner.refilling.store(false, SeqCst);
});
}
}
impl std::fmt::Debug for Pool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pool").finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy)]
pub struct PoolStats {
pub alive: usize,
pub in_use: usize,
pub idle: usize,
pub waiting: usize,
pub max: usize,
pub min: usize,
pub reaped: u64,
pub observed_footprint_mib: u64,
}
impl Pool {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
let _span = tracing::info_span!(
"supermachine.pool.acquire",
memory_mib = self.inner.spawn_cfg.memory_mib,
vcpus = self.inner.spawn_cfg.vcpus,
)
.entered();
let worker = self.inner.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: Some(Arc::new(ImageMeta {
memory_mib: self.inner.spawn_cfg.memory_mib,
vcpus: self.inner.spawn_cfg.vcpus,
layers: self.inner.spawn_cfg.layers.clone(),
delta_squashfs: self.inner.spawn_cfg.delta_squashfs.clone(),
})),
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(&self.inner),
_image: std::marker::PhantomData,
})
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
let vm = self.kvm.take_or_restore()?;
Ok(PooledVm {
vm: Some(vm),
pool: Some(Arc::clone(&self.kvm)),
_image: std::marker::PhantomData,
})
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn stats(&self) -> PoolStats {
let s = self.inner.state.lock().ok();
let alive = s.as_ref().map(|s| s.alive).unwrap_or(0);
let idle = s.as_ref().map(|s| s.idle.len()).unwrap_or(0);
let waiting = s.as_ref().map(|s| s.waiting).unwrap_or(0);
let reaped = s.as_ref().map(|s| s.reaped).unwrap_or(0);
let observed_footprint_mib = self
.inner
.spawn_cfg
.observed_footprint_mib
.load(std::sync::atomic::Ordering::Relaxed);
PoolStats {
alive,
in_use: alive.saturating_sub(idle),
idle,
waiting,
max: self.inner.policy.max,
min: self.inner.policy.min,
reaped,
observed_footprint_mib,
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn stats(&self) -> PoolStats {
let idle = self
.kvm
.idle
.lock()
.unwrap_or_else(|e| e.into_inner())
.len();
let alive = self.kvm.admission.alive();
PoolStats {
alive,
in_use: alive.saturating_sub(idle),
idle,
waiting: 0,
max: self.kvm.admission.max,
min: self.kvm.min,
reaped: 0,
observed_footprint_mib: 0,
}
}
pub fn shutdown(&self) {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
self.inner.shutdown_pool();
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
{
self.kvm
.shut_down
.store(true, std::sync::atomic::Ordering::SeqCst);
let drained: Vec<Vm> =
std::mem::take(&mut *self.kvm.idle.lock().unwrap_or_else(|e| e.into_inner()));
for vm in drained {
self.kvm.admission.release();
drop(vm);
}
}
}
}
pub struct OciImageBuilder {
image: String,
name: Option<String>,
pull_policy: PullPolicy,
memory_mib: Option<u32>,
vcpus: Option<u32>,
guest_port: Option<u16>,
cmd: Option<Vec<String>>,
envs: Vec<(String, String)>,
snapshots_dir: Option<PathBuf>,
warmup: Option<Box<dyn FnOnce(&Vm) -> Result<(), Error> + Send>>,
warmup_tag: Option<String>,
extra_files: Vec<(PathBuf, String)>,
mounts: Vec<(
PathBuf,
String,
crate::vmm::resources::SymlinkPolicy,
String,
)>,
volumes: Vec<(PathBuf, String, u64)>,
extra_kernel_args: Vec<String>,
require_listener: Option<bool>,
pub(crate) platform: Option<String>,
}
impl OciImageBuilder {
pub fn new(image_ref: impl Into<String>) -> Self {
Self {
image: image_ref.into(),
name: None,
pull_policy: PullPolicy::default(),
memory_mib: None,
vcpus: None,
guest_port: None,
cmd: None,
envs: Vec::new(),
snapshots_dir: None,
warmup: None,
warmup_tag: None,
extra_files: Vec::new(),
mounts: Vec::new(),
volumes: Vec::new(),
extra_kernel_args: Vec::new(),
require_listener: None,
platform: None,
}
}
pub fn with_platform(mut self, platform: impl Into<String>) -> Self {
self.platform = Some(platform.into());
self
}
pub fn with_extra_file(
mut self,
host_path: impl Into<PathBuf>,
guest_path: impl Into<String>,
) -> Self {
self.extra_files.push((host_path.into(), guest_path.into()));
self
}
pub fn with_oci_archive(mut self, archive_path: impl Into<PathBuf>) -> Self {
let path: PathBuf = archive_path.into();
let original =
std::mem::replace(&mut self.image, format!("oci-archive:{}", path.display()));
if self.name.is_none() {
self.name = Some(crate::bake::snapshot_name_for_image(&original));
}
self
}
pub fn with_oci_layout(mut self, layout_dir: impl Into<PathBuf>) -> Self {
let path: PathBuf = layout_dir.into();
let original = std::mem::replace(&mut self.image, format!("oci-layout:{}", path.display()));
if self.name.is_none() {
self.name = Some(crate::bake::snapshot_name_for_image(&original));
}
self
}
pub fn with_mount(
mut self,
host_path: impl Into<PathBuf>,
guest_tag: impl Into<String>,
guest_path: impl Into<String>,
) -> Self {
self.mounts.push((
host_path.into(),
guest_tag.into(),
crate::vmm::resources::SymlinkPolicy::default(),
guest_path.into(),
));
self
}
pub fn with_mount_symlinks(
mut self,
host_path: impl Into<PathBuf>,
guest_tag: impl Into<String>,
guest_path: impl Into<String>,
symlinks: crate::vmm::resources::SymlinkPolicy,
) -> Self {
self.mounts.push((
host_path.into(),
guest_tag.into(),
symlinks,
guest_path.into(),
));
self
}
pub fn with_volume(mut self, spec: crate::vmm::resources::VolumeSpec) -> Self {
self.volumes.push((
PathBuf::from(spec.host_path),
spec.guest_path,
spec.size_bytes,
));
self
}
pub fn with_extra_kernel_arg(mut self, arg: impl Into<String>) -> Self {
self.extra_kernel_args.push(arg.into());
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_pull_policy(mut self, policy: PullPolicy) -> Self {
self.pull_policy = policy;
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_guest_port(mut self, port: u16) -> Self {
self.guest_port = Some(port);
self
}
pub fn with_cmd<I, S>(mut self, cmd: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.cmd = Some(cmd.into_iter().map(Into::into).collect());
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.envs.push((key.into(), value.into()));
self
}
pub fn with_snapshots_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.snapshots_dir = Some(dir.into());
self
}
pub fn with_warmup<F>(mut self, warmup: F) -> Self
where
F: FnOnce(&Vm) -> Result<(), Error> + Send + 'static,
{
self.warmup = Some(Box::new(warmup));
self
}
pub fn with_warmup_tag(mut self, tag: impl Into<String>) -> Self {
self.warmup_tag = Some(tag.into());
self
}
pub fn with_listener_required(mut self) -> Self {
self.require_listener = Some(true);
self
}
pub fn build(self) -> Result<Image, Error> {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
{
self.build_kvm()
}
#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
{
self.build_hvf()
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn build_kvm(self) -> Result<Image, Error> {
let snapshots_dir = self
.snapshots_dir
.clone()
.unwrap_or_else(default_snapshots_dir);
let derived_name = self
.name
.clone()
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(&self.image));
if !self.envs.is_empty()
|| self.cmd.is_some()
|| !self.mounts.is_empty()
|| !self.extra_files.is_empty()
|| self.warmup.is_some()
{
eprintln!(
"supermachine(KVM): OciImageBuilder env/cmd/mounts/warmup are not yet \
applied on the KVM backend; baking the base image for {:?}.",
self.image
);
}
let img = Image::from_oci_to_dir(
&self.image,
self.pull_policy,
&snapshots_dir,
Some(&derived_name),
)?;
if self.volumes.is_empty() {
return Ok(img);
}
let snap_dir = snapshots_dir.join(&derived_name);
persist_kvm_builder_volumes(&snap_dir, &self.volumes)?;
Image::from_snapshot(&snap_dir)
}
#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
fn build_hvf(self) -> Result<Image, Error> {
let snapshots_dir = self.snapshots_dir.unwrap_or_else(default_snapshots_dir);
let derived_name = self
.name
.clone()
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(&self.image));
let snap_dir = snapshots_dir.join(&derived_name);
let cache_loadable = Image::from_snapshot(&snap_dir).is_ok();
if cache_loadable {
if let Some(baked) = snap_dir_baked_under_other_version(&snap_dir) {
let current = env!("CARGO_PKG_VERSION");
eprintln!(
"supermachine: snapshot at {} was baked under v{baked}; \
current binaries are v{current}. Auto-rebaking \
(this is a one-time cost on the next build after upgrade).",
snap_dir.display()
);
invalidate_stale_snapshot_tree(&snap_dir);
}
}
let cache_loadable = cache_loadable && snap_dir.is_dir();
match self.pull_policy {
PullPolicy::Never => {
if cache_loadable {
return Image::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {} at {} (PullPolicy::Never)",
self.image,
snap_dir.display()
)));
}
_ => {}
}
let mut extra_args: Vec<String> = Vec::new();
for (k, v) in &self.envs {
extra_args.push("--env".to_owned());
extra_args.push(format!("{k}={v}"));
}
for (host, guest) in &self.extra_files {
extra_args.push("--extra-file".to_owned());
extra_args.push(format!("{}:{}", host.display(), guest));
}
use crate::vmm::resources::SymlinkPolicy;
let mut effective_mounts: Vec<(PathBuf, String, SymlinkPolicy, String)> =
self.mounts.clone();
let platform_amd64 = self.platform.as_deref() == Some("linux/amd64");
if platform_amd64
&& !effective_mounts
.iter()
.any(|(_, tag, _, _)| tag == "rosetta")
{
const ROSETTA_HOST: &str = "/Library/Apple/usr/libexec/oah/RosettaLinux";
let rosetta_path = PathBuf::from(ROSETTA_HOST);
if !rosetta_path.join("rosetta").is_file() || !rosetta_path.join("rosettad").is_file() {
return Err(Error::bake_msg(format!(
"platform=linux/amd64 requires Apple Rosetta runtime at \
{ROSETTA_HOST}; install with `softwareupdate --install-rosetta`"
)));
}
effective_mounts.push((
rosetta_path,
"rosetta".to_owned(),
SymlinkPolicy::Opaque,
"/run/rosetta".to_owned(),
));
}
if platform_amd64
&& !effective_mounts
.iter()
.any(|(_, tag, _, _)| tag == "rosettad-cache")
{
if let Some(home) = std::env::var_os("HOME") {
let cache_dir = PathBuf::from(home)
.join(".cache")
.join("supermachine")
.join("rosetta-aot");
if let Err(e) = std::fs::create_dir_all(&cache_dir) {
eprintln!(
"supermachine: could not create Rosetta AOT cache dir \
{}: {} — amd64 binaries will still run via JIT but \
translations won't persist across VM restarts",
cache_dir.display(),
e
);
} else {
effective_mounts.push((
cache_dir,
"rosettad-cache".to_owned(),
SymlinkPolicy::Opaque,
"/var/cache/rosettad".to_owned(),
));
}
}
}
for (host, tag, symlinks, guest_path) in &effective_mounts {
extra_args.push("--mount".to_owned());
let s = match symlinks {
SymlinkPolicy::Opaque => {
format!("{}:{}:{}", host.display(), tag, guest_path)
}
SymlinkPolicy::Deny => {
format!("{}:{}:{}:deny", host.display(), tag, guest_path)
}
SymlinkPolicy::Follow => {
format!("{}:{}:{}:follow", host.display(), tag, guest_path)
}
};
extra_args.push(s);
}
for (host, guest, size_bytes) in &self.volumes {
extra_args.push("--volume".to_owned());
extra_args.push(format!("{}:{}:{}", host.display(), guest, size_bytes));
}
for token in &self.extra_kernel_args {
extra_args.push("--cmdline-extra".to_owned());
extra_args.push(token.clone());
}
if self.require_listener.unwrap_or(false) {
extra_args.push("--supermachine-listener-required".to_owned());
}
let cmd_override = match &self.cmd {
Some(argv) => Some(
serde_json::to_string(argv)
.map_err(|e| Error::bake_msg(format!("encode cmd: {e}")))?,
),
None => None,
};
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: self.image.clone(),
name: self.name.clone(),
runtime: "supermachine".to_owned(),
guest_port: self.guest_port.unwrap_or(80),
memory_mib: self.memory_mib.unwrap_or(256),
vcpus: self.vcpus.unwrap_or(1),
pull_policy: self.pull_policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.clone(),
cmd_override,
extra_args,
platform: self
.platform
.clone()
.unwrap_or_else(|| "linux/arm64".to_owned()),
};
if self.warmup.is_none() {
let trace = crate::trace::enabled("bake");
let bake_t0 = std::time::Instant::now();
let use_pre_exec = !self.require_listener.unwrap_or(false);
let pipelined = crate::bake::PipelinedWarmup {
warm_dir: snapshots_dir.join(format!("{}__warm__unused", derived_name)),
warm_tag: "unused".to_owned(),
keep_alive: true,
skip_warm_snapshot: true,
use_pre_exec_trigger: use_pre_exec,
callback: Box::new(|_ctx| Ok(())),
};
match crate::bake::run_push_pipelined(&request, bake_t0, &root, pipelined) {
Ok(warm_handoff) => {
if trace {
eprintln!(
"[bake-trace] always-pipelined (skip-warm) total: {:?} (bg save \
may still be in flight)",
bake_t0.elapsed()
);
}
let img = Image::from_snapshot_pending(&snap_dir)?;
if let Some(bw) = warm_handoff {
*img.warm_baked_worker.inner.lock().unwrap() = Some(bw);
}
return Ok(img);
}
Err(msg) => {
return Err(map_bake_error(&request.image, msg));
}
}
}
let warmup = self.warmup.unwrap();
let tag = self.warmup_tag.as_deref().unwrap_or("default");
let warm_dir = snapshots_dir.join(format!("{}__warm__{}", derived_name, tag));
if let Ok(image) = Image::from_snapshot(&warm_dir) {
if let Some(baked) = snap_dir_baked_under_other_version(&warm_dir) {
let current = env!("CARGO_PKG_VERSION");
eprintln!(
"supermachine: warm snapshot at {} was baked under v{baked}; \
current binaries are v{current}. Auto-rebaking.",
warm_dir.display()
);
invalidate_stale_snapshot_tree(&warm_dir);
invalidate_stale_snapshot_tree(&snap_dir);
} else {
return Ok(image);
}
}
let trace = crate::trace::enabled("bake");
let bake_t0 = std::time::Instant::now();
let warmup_err: std::sync::Arc<std::sync::Mutex<Option<Error>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let warmup_err_inner = warmup_err.clone();
let warmup_t0_capture: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let warmup_t0_inner = warmup_t0_capture.clone();
let pipelined = crate::bake::PipelinedWarmup {
warm_dir: warm_dir.clone(),
warm_tag: tag.to_owned(),
keep_alive: true,
skip_warm_snapshot: false,
use_pre_exec_trigger: !self.require_listener.unwrap_or(false),
callback: Box::new(move |ctx| {
if let Ok(mut g) = warmup_t0_inner.lock() {
*g = Some(std::time::Instant::now());
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
{
let synth_vm = Vm {
pool: None,
vsock_mux_path: ctx.vsock_mux_path.clone(),
vsock_exec_path: ctx.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: None,
};
let result = warmup(&synth_vm);
drop(synth_vm);
match result {
Ok(()) => Ok(()),
Err(e) => {
let msg = e.to_string();
if let Ok(mut g) = warmup_err_inner.lock() {
*g = Some(e);
}
Err(msg)
}
}
}
#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
{
let _ = (&ctx, &warmup, &warmup_err_inner);
Ok(())
}
}),
};
match crate::bake::run_push_pipelined(&request, bake_t0, &root, pipelined) {
Ok(warm_handoff) => {
if trace {
eprintln!("[bake-trace] pipelined bake total: {:?}", bake_t0.elapsed());
if let Some(t0) = warmup_t0_capture.lock().ok().and_then(|g| *g) {
eprintln!("[bake-trace] (warmup ran at +{:?})", t0 - bake_t0);
}
}
let img = Image::from_snapshot(&warm_dir)?;
if let Some(bw) = warm_handoff {
*img.warm_baked_worker.inner.lock().unwrap() = Some(bw);
}
Ok(img)
}
Err(msg) => {
if let Some(e) = warmup_err.lock().ok().and_then(|mut g| g.take()) {
return Err(e);
}
Err(map_bake_error(&request.image, msg))
}
}
}
}
fn snapshot_baked_under_other_version(meta: &serde_json::Value) -> Option<String> {
let current = env!("CARGO_PKG_VERSION");
let baked = meta
.get("baked_by_version")
.and_then(|v| v.as_str())
.map(|s| s.to_owned())
.or_else(|| {
meta.get("kernel")
.and_then(|v| v.as_str())
.and_then(parse_version_segment)
})?;
if baked == current {
None
} else {
Some(baked)
}
}
fn snap_dir_baked_under_other_version(snap_dir: &Path) -> Option<String> {
let meta_text = std::fs::read_to_string(snap_dir.join("metadata.json")).ok()?;
let meta: serde_json::Value = serde_json::from_str(&meta_text).ok()?;
snapshot_baked_under_other_version(&meta)
}
fn invalidate_stale_snapshot_tree(snap_dir: &Path) {
if let Err(e) = std::fs::remove_dir_all(snap_dir) {
if e.kind() != std::io::ErrorKind::NotFound {
eprintln!(
"supermachine: warning: failed to remove stale snapshot {}: {e}",
snap_dir.display()
);
}
}
let Some(parent) = snap_dir.parent() else {
return;
};
let Some(base_name) = snap_dir.file_name().and_then(|s| s.to_str()) else {
return;
};
let prefix = format!("{base_name}__warm__");
let Ok(read) = std::fs::read_dir(parent) else {
return;
};
for entry in read.flatten() {
let path = entry.path();
if path
.file_name()
.and_then(|s| s.to_str())
.is_some_and(|n| n.starts_with(&prefix))
{
if let Err(e) = std::fs::remove_dir_all(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
eprintln!(
"supermachine: warning: failed to remove stale warm snapshot {}: {e}",
path.display()
);
}
}
}
}
}
fn warn_if_snapshot_version_mismatch(meta: &serde_json::Value, snapshot_path: &Path) {
let Some(baked) = snapshot_baked_under_other_version(meta) else {
return;
};
let current = env!("CARGO_PKG_VERSION");
eprintln!(
"supermachine: warning: snapshot at {} was baked under \
v{baked}; current binaries are v{current}. Re-bake \
(delete the snapshot dir and rerun `supermachine run`) \
to pick up init/kernel fixes shipped since v{baked}.",
snapshot_path.display()
);
}
fn parse_version_segment(kernel_path: &str) -> Option<String> {
let p = Path::new(kernel_path);
for comp in p.components() {
let Some(s) = comp.as_os_str().to_str() else {
continue;
};
let Some(rest) = s.strip_prefix('v') else {
continue;
};
if rest.matches('.').count() < 2 {
continue;
}
if !rest
.split('.')
.all(|seg| seg.chars().all(|c| c.is_ascii_digit()))
{
continue;
}
return Some(rest.to_owned());
}
None
}
fn parse_symlink_policy(s: &str) -> Option<crate::vmm::resources::SymlinkPolicy> {
use crate::vmm::resources::SymlinkPolicy;
match s {
"deny" => Some(SymlinkPolicy::Deny),
"opaque" => Some(SymlinkPolicy::Opaque),
"follow" => Some(SymlinkPolicy::Follow),
_ => None,
}
}
fn scrub_aotcache_orphans(cache_dir: &Path) -> usize {
use std::os::unix::ffi::OsStrExt;
let entries = match std::fs::read_dir(cache_dir) {
Ok(it) => it,
Err(_) => return 0,
};
let mut evicted_zero = 0usize;
let mut evicted_partial = 0usize;
for ent in entries.flatten() {
let name = ent.file_name();
let bytes = name.as_bytes();
let is_aotcache_final = bytes.ends_with(b".aotcache") && !bytes.starts_with(b".");
let is_aotcache_partial = bytes.starts_with(b".") && bytes.ends_with(b".aotcache.partial");
if !is_aotcache_final && !is_aotcache_partial {
continue;
}
let path = ent.path();
let Ok(md) = std::fs::metadata(&path) else {
continue;
};
if !md.is_file() {
continue;
}
if is_aotcache_partial {
if std::fs::remove_file(&path).is_ok() {
evicted_partial += 1;
}
} else if md.len() == 0 {
if std::fs::remove_file(&path).is_ok() {
evicted_zero += 1;
}
}
}
let evicted = evicted_zero + evicted_partial;
if evicted > 0 {
eprintln!(
"supermachine: scrubbed {} AOT cache orphan(s) from {} \
({} zero-byte aotcache, {} mid-flight partial) \u{2014} \
would have trapped rosetta with the segment_count \
assertion or SIGBUS on mmap past EOF",
evicted,
cache_dir.display(),
evicted_zero,
evicted_partial,
);
}
evicted
}
fn default_snapshots_dir() -> PathBuf {
if let Some(d) = std::env::var_os("SUPERMACHINE_SNAPSHOTS") {
return PathBuf::from(d);
}
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
home.join(".local/supermachine-snapshots")
}
fn repo_root_for_bake() -> Result<PathBuf, Error> {
if let Some(root) = std::env::var_os("SUPERMACHINE_ROOT") {
return Ok(PathBuf::from(root));
}
let exe = std::env::current_exe().map_err(|e| Error::bake_msg(format!("current_exe: {e}")))?;
for ancestor in exe.ancestors() {
if ancestor.join("tools/supermachine-push").is_file() {
return Ok(ancestor.to_path_buf());
}
if ancestor.join("share/supermachine/kernel").is_file() {
return Ok(ancestor.to_path_buf());
}
}
std::env::current_dir().map_err(|e| Error::bake_msg(format!("current_dir: {e}")))
}
fn map_bake_error(image: &str, msg: String) -> Error {
if let Some(rest) = msg.strip_prefix("KERNEL_PANIC|") {
let mut parts = rest.splitn(2, '|');
let first_line = parts.next().unwrap_or("").to_owned();
let stack: Vec<String> = parts
.next()
.map(|s| s.split('\x1F').map(ToOwned::to_owned).collect())
.unwrap_or_default();
return Error::kernel_panic(first_line, stack);
}
let lc = msg.to_ascii_lowercase();
let has_status = |code: u16| -> bool {
let needle = format!("http {code}");
lc.contains(&needle)
};
if has_status(404)
|| lc.contains("manifest unknown")
|| lc.contains("name unknown")
|| lc.contains("not found")
&& (lc.contains("registry") || lc.contains("manifest") || lc.contains("image"))
{
return Error::image_not_found(image, msg);
}
if has_status(401)
|| has_status(403)
|| lc.contains("unauthorized")
|| lc.contains("forbidden")
|| lc.contains("auth challenge")
{
return Error::registry_auth(image, msg);
}
if lc.contains("could not resolve")
|| lc.contains("dns")
|| lc.contains("connection refused")
|| lc.contains("connection reset")
|| lc.contains("network is unreachable")
|| lc.contains("ssl_connect")
|| lc.contains("tls handshake")
|| lc.contains("curl: (6)") || lc.contains("curl: (7)") || lc.contains("curl: (28)") || lc.contains("curl: (35)") || lc.contains("curl: (56)")
{
return Error::registry_unreachable(msg);
}
if lc.contains("registry") || lc.contains("manifest") || lc.contains("docker pull") {
Error::network_msg(msg)
} else {
Error::bake_msg(msg)
}
}
#[derive(Debug, Clone, Default)]
pub struct VmConfig {
memory_mib: Option<u32>,
vcpus: Option<u32>,
assets: Option<AssetPaths>,
vsock_mux_dir: Option<PathBuf>,
restore_timeout: Option<Duration>,
volumes: Vec<crate::vmm::resources::VolumeSpec>,
virtiofs: Vec<crate::vmm::resources::MountSpec>,
enable_balloon: bool,
}
impl VmConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_balloon(mut self, enable: bool) -> Self {
self.enable_balloon = enable;
self
}
pub fn with_volume(mut self, spec: crate::vmm::resources::VolumeSpec) -> Self {
self.volumes.push(spec);
self
}
pub fn with_virtiofs(mut self, spec: crate::vmm::resources::MountSpec) -> Self {
self.virtiofs.push(spec);
self
}
pub fn with_assets(mut self, assets: AssetPaths) -> Self {
self.assets = Some(assets);
self
}
pub fn with_vsock_mux_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.vsock_mux_dir = Some(dir.into());
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.restore_timeout = Some(timeout);
self
}
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub struct Vm {
pool: Option<WarmPool>,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
own_vsock_mux_dir: Option<PathBuf>,
skip_cleanup: bool,
image_meta: Option<Arc<ImageMeta>>,
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub struct Vm {
running: Option<crate::kvm::run::RunningVm>,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
own_vsock_mux_dir: Option<PathBuf>,
skip_cleanup: bool,
time_sync_stop: Arc<AtomicBool>,
}
#[cfg(not(any(
all(target_os = "macos", target_arch = "aarch64"),
all(target_os = "linux", target_arch = "x86_64")
)))]
pub struct Vm {
_never: std::convert::Infallible,
}
#[derive(Clone, Debug)]
pub(crate) struct ImageMeta {
pub memory_mib: u32,
pub vcpus: u32,
pub layers: Vec<PathBuf>,
pub delta_squashfs: Option<PathBuf>,
}
#[cfg(any(
all(target_os = "macos", target_arch = "aarch64"),
all(target_os = "linux", target_arch = "x86_64")
))]
impl Vm {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn start(image: &Image, config: &VmConfig) -> Result<Vm, Error> {
#[cfg(target_os = "macos")]
if let Err(msg) = crate::codesign::check_self_has_hvf_entitlement() {
return Err(Error::vm_msg(msg));
}
let assets = match &config.assets {
Some(a) => a.clone(),
None => AssetPaths::discover(),
};
let kernel: PathBuf = if let Some(k) = image.bundled_kernel.as_ref() {
k.clone()
} else if let Some(k) = assets.kernel.as_ref() {
k.clone()
} else {
return Err(Error::assets_msg(
"no kernel found: snapshot dir has no bundled kernel and AssetPaths::discover() came up empty; set VmConfig::with_assets() or $SUPERMACHINE_ASSETS_DIR".to_owned(),
));
};
let kernel = kernel.as_path();
let dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => std::env::temp_dir(),
};
let mut own_dir = None;
if !dir.is_dir() {
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
own_dir = Some(dir.clone());
}
let vsock_mux_path = dir.join(format!(
"supermachine-vm-{}-{}.sock",
std::process::id(),
unique_suffix(),
));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let memory_mib = config.memory_mib.unwrap_or(image.memory_mib);
let vcpus = config.vcpus.unwrap_or(image.vcpus);
let mut resources = VmResources::new()
.with_kernel_path(kernel.to_string_lossy().to_string())
.with_memory_mib(memory_mib as usize)
.with_vcpus(vcpus)
.with_cow_restore(true)
.with_restore(image.snapshot_path.to_string_lossy().to_string())
.with_vsock_mux(vsock_mux_path.to_string_lossy().to_string())
.with_vsock_exec(vsock_exec_path.to_string_lossy().to_string());
for layer in &image.layers {
resources = resources.with_block_device(layer.to_string_lossy().to_string());
}
if let Some(delta) = &image.delta_squashfs {
resources = resources.with_block_device(delta.to_string_lossy().to_string());
}
let options = RunOptions::default();
let pool = WarmPool::start(resources, options).map_err(Error::from)?;
let timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let _ = pool
.restore_timeout(image.snapshot_path.to_string_lossy().to_string(), timeout)
.map_err(Error::from)?;
Ok(Vm {
pool: Some(pool),
vsock_mux_path,
vsock_exec_path,
own_vsock_mux_dir: own_dir,
skip_cleanup: false,
image_meta: Some(Arc::new(ImageMeta {
memory_mib,
vcpus,
layers: image.layers.clone(),
delta_squashfs: image.delta_squashfs.clone(),
})),
})
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn start(image: &Image, config: &VmConfig) -> Result<Vm, Error> {
use crate::kvm::run::{LinuxVm, LinuxVmConfig};
let parts = image.kvm.as_ref().ok_or_else(|| {
Error::vm_msg(
"Image has no KVM artifacts: metadata.json needs \"backend\":\"kvm\" \
with kvm_kernel + kvm_initrd (produced by the KVM bake path)."
.to_owned(),
)
})?;
let vm = if let Some(snap) = &parts.snapshot {
if !config.volumes.is_empty() {
eprintln!(
"supermachine(KVM): {} volume(s) requested via VmConfig but this image \
restores from a snapshot — the snapshot's OWN recorded volume attachments \
are re-attached (SMSNAP04), so the config volumes are ignored. To attach \
different volumes, cold-boot the image (e.g. bake_kvm_auto) with them.",
config.volumes.len()
);
}
if !config.virtiofs.is_empty() {
eprintln!(
"supermachine(KVM): {} virtio-fs mount(s) requested via VmConfig but this image \
restores from a snapshot — the snapshot's OWN recorded virtio-fs mounts are \
re-attached (SMSNAP05, DAX slots eagerly re-bound), so the config mounts are \
ignored. To attach different mounts, cold-boot the image with them.",
config.virtiofs.len()
);
}
LinuxVm::restore_from_file(snap)
.map_err(|e| Error::vm_msg(format!("restore snapshot {}: {e}", snap.display())))?
} else {
let kernel_path = parts
.kernel
.as_ref()
.expect("kvm cold-boot kernel (validated in from_snapshot)");
let initrd_path = parts
.initrd
.as_ref()
.expect("kvm cold-boot initrd (validated in from_snapshot)");
let kernel = std::fs::read(kernel_path).map_err(|e| {
Error::vm_msg(format!("read kernel {}: {e}", kernel_path.display()))
})?;
let initrd = std::fs::read(initrd_path).map_err(|e| {
Error::vm_msg(format!("read initrd {}: {e}", initrd_path.display()))
})?;
let disk_str: Option<String> = parts
.disk
.as_ref()
.map(|p| p.to_string_lossy().into_owned());
let disk_size = parts
.disk
.as_ref()
.and_then(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.unwrap_or(0);
let mem_mib = config.memory_mib.unwrap_or(image.memory_mib).max(1);
let num_cpus = config.vcpus.unwrap_or(image.vcpus).clamp(1, 255) as u8;
let host_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let cmdline = format!(
"console=ttyS0 panic=-1 reboot=t tsi_hijack supermachine.host_time={host_time}"
);
let tsi_token = crate::cli::TsiToken::generate()
.map(|t| t.bytes)
.map_err(|e| {
eprintln!("supermachine(KVM): tsi token generation failed ({e}); booting without vsock control-channel auth");
})
.ok();
let mut vol_specs: Vec<crate::vmm::resources::VolumeSpec> = config.volumes.clone();
for (host_file, guest_path, size_bytes, _pristine) in &image.volumes {
if !vol_specs.iter().any(|v| v.guest_path == *guest_path) {
vol_specs.push(
crate::vmm::resources::VolumeSpec::new(
host_file.to_string_lossy().to_string(),
guest_path.clone(),
)
.with_size_bytes(*size_bytes),
);
}
}
let volumes = prepare_kvm_volumes(&vol_specs)?;
let virtiofs: Vec<crate::kvm::run::VirtioFsAttach> = config
.virtiofs
.iter()
.map(|m| crate::kvm::run::VirtioFsAttach {
host_path: m.host_path.clone(),
tag: m.guest_tag.clone(),
mount: m.guest_path.clone(),
})
.collect();
let lcfg = LinuxVmConfig {
mem_size: (mem_mib as usize) * 1024 * 1024,
num_cpus,
kernel: &kernel,
initrd: Some(&initrd),
disk_path: disk_str.as_deref(),
disk_size,
cmdline: &cmdline,
enable_vsock: true,
volumes: &volumes,
virtiofs: &virtiofs,
tsi_token,
enable_balloon: config.enable_balloon,
};
LinuxVm::new(&lcfg).map_err(|e| Error::vm_msg(format!("LinuxVm::new: {e}")))?
};
let exec_path = vm
.start_exec_bridge(1028)
.map_err(|e| Error::vm_msg(format!("start_exec_bridge: {e}")))?;
let mux_path = vm
.start_tsi_mux()
.unwrap_or_else(|_| exec_path.with_extension("mux-unwired"));
let running = vm.start_running();
let sync_path = exec_path.clone();
let time_sync_stop = Arc::new(AtomicBool::new(false));
let stop_c = time_sync_stop.clone();
std::thread::spawn(move || {
let deadline = std::time::Instant::now() + Duration::from_secs(30);
loop {
if stop_c.load(Ordering::SeqCst) {
break;
}
if sync_time_via_agent(&sync_path) {
break;
}
if std::time::Instant::now() >= deadline {
break;
}
std::thread::sleep(Duration::from_millis(200));
}
});
Ok(Vm {
running: Some(running),
vsock_mux_path: mux_path,
vsock_exec_path: exec_path,
own_vsock_mux_dir: None,
skip_cleanup: false,
time_sync_stop,
})
}
pub fn vsock_path(&self) -> &Path {
&self.vsock_mux_path
}
pub fn exec_path(&self) -> &Path {
&self.vsock_exec_path
}
pub fn exec<I, S>(&self, argv: I) -> std::io::Result<crate::exec::ExecChild>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let argv: Vec<String> = argv.into_iter().map(|s| s.into()).collect();
let _span = tracing::info_span!(
"supermachine.exec",
argv0 = argv.first().map(|s| s.as_str()).unwrap_or(""),
argc = argv.len(),
)
.entered();
self.exec_builder().argv(argv).spawn()
}
pub fn exec_builder(&self) -> crate::exec::ExecBuilder {
crate::exec::ExecBuilder::new(self.vsock_exec_path.clone())
}
pub fn write_file(&self, path: &str, bytes: &[u8]) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "write_file",
"path": path,
"data_b64": b64_encode(bytes),
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn read_file(&self, path: &str) -> std::io::Result<Vec<u8>> {
self.read_file_with_max_bytes(path, 32 * 1024 * 1024)
}
pub fn read_file_with_max_bytes(&self, path: &str, max_bytes: u64) -> std::io::Result<Vec<u8>> {
let body = serde_json::json!({
"action": "read_file",
"path": path,
"max_bytes": max_bytes,
});
let ack = crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(std::time::Duration::from_secs(30)),
)?;
let data_b64 = ack
.get("data_b64")
.and_then(|v| v.as_str())
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"read_file: agent ack missing data_b64",
)
})?;
b64_decode(data_b64).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
pub fn workload_signal(&self, signum: i32) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "signal",
"signum": signum,
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn connect(&self) -> std::io::Result<UnixStream> {
UnixStream::connect(&self.vsock_mux_path)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn request_balloon_inflate(&self, pages: u32) -> bool {
self.running
.as_ref()
.map(|r| r.request_balloon_inflate(pages))
.unwrap_or(false)
}
pub fn expose_tcp(&self, host_port: u16, guest_port: u16) -> std::io::Result<TcpForwarder> {
let listener = TcpListener::bind(("127.0.0.1", host_port))?;
let bound = listener.local_addr()?;
listener.set_nonblocking(false)?;
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = stop.clone();
let vsock_path = self.vsock_mux_path.clone();
let handle = std::thread::Builder::new()
.name(format!("supermachine-tcp-{host_port}"))
.spawn(move || {
accept_loop(listener, vsock_path, stop_thread, guest_port);
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(TcpForwarder {
stop,
handle: Some(handle),
bound,
})
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn expose_tls(
&self,
cfg: crate::vmm::tls::TlsConfig,
) -> std::io::Result<std::net::SocketAddr> {
let running = self.running.as_ref().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "VM is not running")
})?;
running
.expose_tls(cfg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn stop(mut self) -> Result<(), Error> {
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown().map_err(Error::from)?;
}
self.cleanup_socket();
Ok(())
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn stop(mut self) -> Result<(), Error> {
if let Some(running) = self.running.take() {
let _ = running.stop();
}
self.cleanup_socket();
Ok(())
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub(crate) fn reset_to_snapshot(&self) -> Result<(), Error> {
self.running
.as_ref()
.ok_or_else(|| Error::vm_msg("reset_to_snapshot: VM has no running handle".to_owned()))?
.reset_to_snapshot()
.map_err(|e| Error::vm_msg(format!("reset_to_snapshot: {e}")))
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn snapshot(mut self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let meta = self.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"Vm::snapshot requires an in-process Vm (use image.start, not image.acquire)"
.to_owned(),
)
})?;
let pool = self.pool.as_ref().ok_or_else(|| {
Error::vm_msg("Vm::snapshot: no pool to drive the capture".to_owned())
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let _result = pool
.snapshot_timeout(
snap_path.to_string_lossy().to_string(),
Duration::from_secs(60),
)
.map_err(|e| Error::Vm {
msg: format!("snapshot capture failed: {e:?}"),
source: None,
})?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
self.skip_cleanup = true;
Image::from_snapshot(&dest_dir)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn snapshot(mut self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let running = self.running.take().ok_or_else(|| {
Error::vm_msg("Vm::snapshot: this VM is not running (already stopped?)".to_owned())
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap = running
.snapshot()
.map_err(|e| Error::vm_msg(format!("capture snapshot: {e}")))?;
let snap_path = dest_dir.join("vm.snap");
snap.save(&snap_path)
.map_err(|e| Error::vm_msg(format!("write snapshot {}: {e}", snap_path.display())))?;
let metadata = serde_json::json!({
"backend": "kvm",
"kvm_snapshot": snap_path.to_string_lossy(),
"memory_mib": (snap.mem_size() >> 20) as u32,
"vcpus": snap.num_cpus() as u32,
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
self.cleanup_socket();
self.skip_cleanup = true;
Image::from_snapshot(&dest_dir)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn snapshot_live(&self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let running = self
.running
.as_ref()
.ok_or_else(|| Error::vm_msg("Vm::snapshot_live: VM not running".to_owned()))?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap = running
.snapshot_live()
.map_err(|e| Error::vm_msg(format!("capture live snapshot: {e}")))?;
let snap_path = dest_dir.join("restore.snap");
snap.save(&snap_path)
.map_err(|e| Error::vm_msg(format!("write snapshot {}: {e}", snap_path.display())))?;
write_kvm_snapshot_metadata(
&dest_dir,
"restore.snap",
None,
snap.mem_size(),
snap.num_cpus(),
)?;
Image::from_snapshot(&dest_dir)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn snapshot_diff_live(
&self,
dest_dir: impl Into<PathBuf>,
base_snap: &Path,
) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let running = self
.running
.as_ref()
.ok_or_else(|| Error::vm_msg("Vm::snapshot_diff_live: VM not running".to_owned()))?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap = running
.snapshot_live()
.map_err(|e| Error::vm_msg(format!("capture live snapshot: {e}")))?;
let snap_path = dest_dir.join("restore.snap");
snap.save_diff(&snap_path, base_snap).map_err(|e| {
Error::vm_msg(format!("write diff snapshot {}: {e}", snap_path.display()))
})?;
write_kvm_snapshot_metadata(
&dest_dir,
"restore.snap",
Some(base_snap),
snap.mem_size(),
snap.num_cpus(),
)?;
Image::from_snapshot(&dest_dir)
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub fn snapshot_diff(
mut self,
dest_dir: impl Into<PathBuf>,
base: &Image,
) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let base_snap = base
.kvm
.as_ref()
.and_then(|k| k.snapshot.clone())
.ok_or_else(|| {
Error::vm_msg(
"Vm::snapshot_diff: base Image has no KVM snapshot to diff against".to_owned(),
)
})?;
let running = self.running.take().ok_or_else(|| {
Error::vm_msg("Vm::snapshot_diff: this VM is not running (already stopped?)".to_owned())
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap = running
.snapshot()
.map_err(|e| Error::vm_msg(format!("capture snapshot: {e}")))?;
let snap_path = dest_dir.join("vm.snap");
snap.save_diff(&snap_path, &base_snap).map_err(|e| {
Error::vm_msg(format!("write diff snapshot {}: {e}", snap_path.display()))
})?;
let metadata = serde_json::json!({
"backend": "kvm",
"kvm_snapshot": snap_path.to_string_lossy(),
"kvm_snapshot_base": base_snap.to_string_lossy(),
"memory_mib": (snap.mem_size() >> 20) as u32,
"vcpus": snap.num_cpus() as u32,
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot_diff",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
self.cleanup_socket();
self.skip_cleanup = true;
Image::from_snapshot(&dest_dir)
}
fn cleanup_socket(&self) {
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
if let Some(dir) = &self.own_vsock_mux_dir {
let _ = std::fs::remove_dir(dir);
}
}
}
const B64_ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
pub fn b64_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
let mut i = 0;
while i + 3 <= bytes.len() {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let b2 = bytes[i + 2] as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push(B64_ALPHA[(n & 0x3f) as usize] as char);
i += 3;
}
let rem = bytes.len() - i;
if rem == 1 {
let b0 = bytes[i] as u32;
let n = b0 << 16;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push('=');
out.push('=');
} else if rem == 2 {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let n = (b0 << 16) | (b1 << 8);
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push('=');
}
out
}
pub fn b64_decode(s: &str) -> Result<Vec<u8>, String> {
let mut tbl = [255u8; 256];
for (i, &b) in B64_ALPHA.iter().enumerate() {
tbl[b as usize] = i as u8;
}
let bytes: Vec<u8> = s.bytes().filter(|b| !b.is_ascii_whitespace()).collect();
if bytes.len() % 4 != 0 {
return Err(format!(
"base64 length {} is not a multiple of 4",
bytes.len()
));
}
let nchunks = bytes.len() / 4;
let mut out = Vec::with_capacity(nchunks * 3);
for (ci, chunk) in bytes.chunks_exact(4).enumerate() {
let v: [u8; 4] = chunk.try_into().unwrap();
let pad = v.iter().filter(|&&b| b == b'=').count();
if pad > 0 {
let last_chunk = ci == nchunks - 1;
let trailing_only = match pad {
1 => v[3] == b'=',
2 => v[2] == b'=' && v[3] == b'=',
_ => false,
};
if !last_chunk || !trailing_only {
return Err(format!("base64 misplaced or excess padding in chunk {ci}"));
}
}
let mut acc: u32 = 0;
for &b in &v {
let d = if b == b'=' {
0
} else {
let d = tbl[b as usize];
if d == 255 {
return Err(format!("invalid base64 character {:#x}", b));
}
d
};
acc = (acc << 6) | (d as u32);
}
out.push(((acc >> 16) & 0xff) as u8);
if pad < 2 {
out.push(((acc >> 8) & 0xff) as u8);
}
if pad < 1 {
out.push((acc & 0xff) as u8);
}
}
Ok(out)
}
#[cfg(any(
all(target_os = "macos", target_arch = "aarch64"),
all(target_os = "linux", target_arch = "x86_64")
))]
impl Drop for Vm {
fn drop(&mut self) {
if self.skip_cleanup {
return;
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
{
self.time_sync_stop.store(true, Ordering::SeqCst);
if let Some(running) = self.running.take() {
let _ = running.stop();
}
}
self.cleanup_socket();
}
}
pub struct TcpForwarder {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
bound: SocketAddr,
}
impl TcpForwarder {
pub fn local_addr(&self) -> SocketAddr {
self.bound
}
pub fn stop(mut self) {
self.shutdown();
}
fn shutdown(&mut self) {
self.stop.store(true, Ordering::SeqCst);
let _ = TcpStream::connect_timeout(&self.bound, Duration::from_millis(200));
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for TcpForwarder {
fn drop(&mut self) {
self.shutdown();
}
}
fn accept_loop(listener: TcpListener, vsock_path: PathBuf, stop: Arc<AtomicBool>, guest_port: u16) {
for incoming in listener.incoming() {
if stop.load(Ordering::SeqCst) {
break;
}
let tcp = match incoming {
Ok(s) => s,
Err(_) => continue,
};
let vsock = vsock_path.clone();
std::thread::Builder::new()
.name("supermachine-tcp-conn".into())
.spawn(move || {
if let Err(e) = splice_tcp_to_unix(tcp, &vsock, guest_port) {
eprintln!("supermachine: tcp forward: {e}");
}
})
.ok();
}
}
fn splice_tcp_to_unix(tcp: TcpStream, vsock_path: &Path, guest_port: u16) -> std::io::Result<()> {
let mut unix = UnixStream::connect(vsock_path)?;
if guest_port != 0 {
let mut hdr = [0u8; 20];
hdr[..16].copy_from_slice(b"SMUX-PORT-V1\0\0\0\0");
hdr[16..].copy_from_slice(&(guest_port as u32).to_be_bytes());
unix.write_all(&hdr)?;
}
let tcp_w = tcp.try_clone()?;
let unix_w = unix.try_clone()?;
let t1 = std::thread::Builder::new()
.name("supermachine-tcp-c2g".into())
.spawn(move || {
let _ = pump(tcp, unix_w);
})?;
let t2 = std::thread::Builder::new()
.name("supermachine-tcp-g2c".into())
.spawn(move || {
let _ = pump(unix, tcp_w);
})?;
let _ = t1.join();
let _ = t2.join();
Ok(())
}
fn pump<R, W>(mut r: R, mut w: W) -> std::io::Result<()>
where
R: Read,
W: Write + Shutdownable,
{
let mut buf = [0u8; 16 * 1024];
loop {
let n = match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
w.write_all(&buf[..n])?
}
let _ = w.shutdown_write();
Ok(())
}
trait Shutdownable {
fn shutdown_write(&mut self) -> std::io::Result<()>;
}
impl Shutdownable for TcpStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
TcpStream::shutdown(self, std::net::Shutdown::Write)
}
}
impl Shutdownable for UnixStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
UnixStream::shutdown(self, std::net::Shutdown::Write)
}
}
fn unique_suffix() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
nanos.wrapping_add(COUNTER.fetch_add(1, Ordering::Relaxed))
}
fn chrono_rfc3339_now() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let days = secs.div_euclid(86_400);
let sod = secs.rem_euclid(86_400);
let hh = sod / 3600;
let mm = (sod % 3600) / 60;
let ss = sod % 60;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
}
#[cfg(test)]
mod oci_image_builder_mount_tests {
use super::OciImageBuilder;
use crate::vmm::resources::SymlinkPolicy;
use std::path::PathBuf;
#[test]
fn with_mount_stores_all_four_fields() {
let b = OciImageBuilder::new("alpine").with_mount("/host/x", "tag", "/workspace");
assert_eq!(b.mounts.len(), 1);
let (h, t, p, gp) = &b.mounts[0];
assert_eq!(h, &PathBuf::from("/host/x"));
assert_eq!(t, "tag");
assert_eq!(p, &SymlinkPolicy::Opaque);
assert_eq!(gp, "/workspace");
}
#[test]
fn with_mount_symlinks_combines_policy_and_guest_path() {
let b = OciImageBuilder::new("alpine").with_mount_symlinks(
"/host/x",
"ws",
"/workspace",
SymlinkPolicy::Follow,
);
let (h, t, p, gp) = &b.mounts[0];
assert_eq!(h, &PathBuf::from("/host/x"));
assert_eq!(t, "ws");
assert_eq!(p, &SymlinkPolicy::Follow);
assert_eq!(gp, "/workspace");
}
#[test]
fn with_mount_symlinks_deny_policy_round_trips() {
let b = OciImageBuilder::new("alpine").with_mount_symlinks(
"/h",
"t",
"/g",
SymlinkPolicy::Deny,
);
assert_eq!(b.mounts[0].2, SymlinkPolicy::Deny);
}
#[test]
fn multiple_mounts_compose_independently() {
let b = OciImageBuilder::new("alpine")
.with_mount("/host/src", "workspace", "/workspace")
.with_mount("/host/cache", "ro-cache", "/cache")
.with_mount_symlinks("/host/strict", "strict", "/strict", SymlinkPolicy::Deny);
assert_eq!(b.mounts.len(), 3);
assert_eq!(b.mounts[0].3, "/workspace");
assert_eq!(b.mounts[1].3, "/cache");
assert_eq!(b.mounts[2].3, "/strict");
assert_eq!(b.mounts[2].2, SymlinkPolicy::Deny);
}
#[test]
fn mounts_preserve_insertion_order() {
let b = OciImageBuilder::new("alpine")
.with_mount("/h1", "first", "/a/b")
.with_mount("/h2", "second", "/a")
.with_mount("/h3", "third", "/a/c");
let tags: Vec<&str> = b.mounts.iter().map(|m| m.1.as_str()).collect();
assert_eq!(tags, vec!["first", "second", "third"]);
}
}
#[cfg(test)]
mod map_bake_error_tests {
use super::{map_bake_error, Error};
#[test]
fn decodes_kernel_panic_sentinel() {
let msg = "KERNEL_PANIC|Kernel panic - not syncing: oops|frame_a\x1Fframe_b\x1Fframe_c"
.to_string();
let err = map_bake_error("nginx:alpine", msg);
match err {
Error::KernelPanic { first_line, stack } => {
assert!(first_line.contains("Kernel panic"));
assert_eq!(stack, vec!["frame_a", "frame_b", "frame_c"]);
}
other => panic!("expected KernelPanic, got {other:?}"),
}
}
#[test]
fn kernel_panic_with_empty_stack() {
let msg = "KERNEL_PANIC|Internal error: Oops|".to_owned();
match map_bake_error("img", msg) {
Error::KernelPanic { first_line, stack } => {
assert_eq!(first_line, "Internal error: Oops");
assert!(stack.len() <= 1);
}
other => panic!("expected KernelPanic, got {other:?}"),
}
}
#[test]
fn non_panic_messages_unchanged() {
let err = map_bake_error(
"img",
"supermachine snapshot timeout; see bake.log".to_owned(),
);
match err {
Error::Bake { msg, .. } => assert!(msg.contains("timeout")),
other => panic!("expected Bake, got {other:?}"),
}
}
}
#[cfg(test)]
mod base64_tests {
use super::{b64_decode, b64_encode};
use proptest::prelude::*;
#[test]
fn known_vectors_round_trip() {
let cases: &[(&[u8], &str)] = &[
(b"", ""),
(b"f", "Zg=="),
(b"fo", "Zm8="),
(b"foo", "Zm9v"),
(b"foob", "Zm9vYg=="),
(b"fooba", "Zm9vYmE="),
(b"foobar", "Zm9vYmFy"),
];
for (raw, enc) in cases {
assert_eq!(b64_encode(raw), *enc, "encode {raw:?}");
assert_eq!(b64_decode(enc).unwrap(), *raw, "decode {enc:?}");
}
}
#[test]
fn decode_rejects_malformed_padding() {
for bad in ["====", "A===", "=AAA", "AA==AAAA", "Zm==Zm9v", "=Zm9"] {
assert!(
b64_decode(bad).is_err(),
"{bad:?} must be rejected as malformed padding"
);
}
}
#[test]
fn decode_rejects_bad_length_and_chars() {
assert!(b64_decode("ABC").is_err(), "length not a multiple of 4");
assert!(b64_decode("Zm9").is_err());
assert!(b64_decode("Zm$v").is_err(), "invalid character");
assert!(b64_decode("Zm9*").is_err());
}
#[test]
fn decode_tolerates_whitespace() {
assert_eq!(b64_decode("Zm9v YmFy").unwrap(), b"foobar");
assert_eq!(b64_decode("Zm9v\nYmFy\n").unwrap(), b"foobar");
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(1024))]
#[test]
fn round_trips_for_all_bytes(data in proptest::collection::vec(any::<u8>(), 0..512)) {
let enc = b64_encode(&data);
prop_assert_eq!(enc.len() % 4, 0);
prop_assert_eq!(b64_decode(&enc).unwrap(), data);
}
}
}