use std::collections::VecDeque;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::assets::AssetPaths;
use crate::vmm::pool::{PoolClientError, WarmPool, WarmPoolError};
use crate::vmm::resources::VmResources;
use crate::vmm::runner::RunOptions;
#[non_exhaustive]
pub enum Error {
Image {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Vm {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Assets {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Io(std::io::Error),
Network {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
CacheMiss {
msg: String,
},
CacheInvalid {
msg: String,
},
Bake {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
ImageNotFound {
image: String,
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
RegistryAuth {
image: String,
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
RegistryUnreachable {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
PoolExhausted {
msg: String,
},
KernelPanic {
first_line: String,
stack: Vec<String>,
},
}
impl Error {
pub(crate) fn image_msg(msg: impl Into<String>) -> Self {
Error::Image { msg: msg.into(), source: None }
}
pub(crate) fn vm_msg(msg: impl Into<String>) -> Self {
Error::Vm { msg: msg.into(), source: None }
}
pub(crate) fn assets_msg(msg: impl Into<String>) -> Self {
Error::Assets { msg: msg.into(), source: None }
}
pub(crate) fn network_msg(msg: impl Into<String>) -> Self {
Error::Network { msg: msg.into(), source: None }
}
pub(crate) fn bake_msg(msg: impl Into<String>) -> Self {
Error::Bake { msg: msg.into(), source: None }
}
pub(crate) fn cache_miss(msg: impl Into<String>) -> Self {
Error::CacheMiss { msg: msg.into() }
}
pub(crate) fn cache_invalid(msg: impl Into<String>) -> Self {
Error::CacheInvalid { msg: msg.into() }
}
pub(crate) fn image_not_found(image: impl Into<String>, msg: impl Into<String>) -> Self {
Error::ImageNotFound { image: image.into(), msg: msg.into(), source: None }
}
pub(crate) fn registry_auth(image: impl Into<String>, msg: impl Into<String>) -> Self {
Error::RegistryAuth { image: image.into(), msg: msg.into(), source: None }
}
pub(crate) fn registry_unreachable(msg: impl Into<String>) -> Self {
Error::RegistryUnreachable { msg: msg.into(), source: None }
}
pub(crate) fn pool_exhausted(msg: impl Into<String>) -> Self {
Error::PoolExhausted { msg: msg.into() }
}
pub(crate) fn kernel_panic(first_line: impl Into<String>, stack: Vec<String>) -> Self {
Error::KernelPanic { first_line: first_line.into(), stack }
}
pub(crate) fn is_likely_multi_vcpu_restore_stall(&self) -> bool {
let Error::Vm { msg, .. } = self else {
return false;
};
msg.contains("Resource temporarily unavailable")
|| msg.contains("os error 35")
|| msg.contains("os error 60") || msg.contains("timed out")
|| (msg.contains("probe failed") && !msg.contains("speaks protocol v"))
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, source } => f
.debug_struct("Image")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Vm { msg, source } => f
.debug_struct("Vm")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Assets { msg, source } => f
.debug_struct("Assets")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Io(e) => f.debug_tuple("Io").field(e).finish(),
Error::Network { msg, source } => f
.debug_struct("Network")
.field("msg", msg)
.field("source", source)
.finish(),
Error::CacheMiss { msg } => f.debug_struct("CacheMiss").field("msg", msg).finish(),
Error::CacheInvalid { msg } => {
f.debug_struct("CacheInvalid").field("msg", msg).finish()
}
Error::Bake { msg, source } => f
.debug_struct("Bake")
.field("msg", msg)
.field("source", source)
.finish(),
Error::ImageNotFound { image, msg, source } => f
.debug_struct("ImageNotFound")
.field("image", image)
.field("msg", msg)
.field("source", source)
.finish(),
Error::RegistryAuth { image, msg, source } => f
.debug_struct("RegistryAuth")
.field("image", image)
.field("msg", msg)
.field("source", source)
.finish(),
Error::RegistryUnreachable { msg, source } => f
.debug_struct("RegistryUnreachable")
.field("msg", msg)
.field("source", source)
.finish(),
Error::PoolExhausted { msg } => {
f.debug_struct("PoolExhausted").field("msg", msg).finish()
}
Error::KernelPanic { first_line, stack } => f
.debug_struct("KernelPanic")
.field("first_line", first_line)
.field("stack_lines", &stack.len())
.finish(),
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, .. } => write!(f, "image: {msg}"),
Error::Vm { msg, .. } => write!(f, "vm: {msg}"),
Error::Assets { msg, .. } => write!(f, "assets: {msg}"),
Error::Io(e) => write!(f, "io: {e}"),
Error::Network { msg, .. } => write!(f, "network: {msg}"),
Error::CacheMiss { msg } => write!(f, "cache miss: {msg}"),
Error::CacheInvalid { msg } => write!(f, "cache invalid: {msg}"),
Error::Bake { msg, .. } => write!(f, "bake: {msg}"),
Error::ImageNotFound { image, msg, .. } => {
write!(f, "image not found ({image}): {msg}")
}
Error::RegistryAuth { image, msg, .. } => {
write!(f, "registry auth failed for {image}: {msg}")
}
Error::RegistryUnreachable { msg, .. } => {
write!(f, "registry unreachable: {msg}")
}
Error::PoolExhausted { msg } => write!(f, "pool exhausted: {msg}"),
Error::KernelPanic { first_line, stack } => {
writeln!(f, "kernel panic during bake: {first_line}")?;
for line in stack {
writeln!(f, " {line}")?;
}
Ok(())
}
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Image { source, .. }
| Error::Vm { source, .. }
| Error::Assets { source, .. }
| Error::Network { source, .. }
| Error::Bake { source, .. }
| Error::ImageNotFound { source, .. }
| Error::RegistryAuth { source, .. }
| Error::RegistryUnreachable { source, .. } => {
source.as_ref().map(|s| s.as_ref() as &(dyn std::error::Error + 'static))
}
Error::Io(e) => Some(e),
Error::CacheMiss { .. }
| Error::CacheInvalid { .. }
| Error::PoolExhausted { .. }
| Error::KernelPanic { .. } => None,
}
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
}
impl From<WarmPoolError> for Error {
fn from(e: WarmPoolError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
impl From<PoolClientError> for Error {
fn from(e: PoolClientError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PullPolicy {
Always,
Missing,
Never,
}
impl Default for PullPolicy {
fn default() -> Self {
Self::Missing
}
}
impl PullPolicy {
fn as_bake_str(self) -> &'static str {
match self {
Self::Always => "always",
Self::Missing => "missing",
Self::Never => "never",
}
}
}
#[derive(Debug, Clone)]
pub struct Image {
snapshot_path: PathBuf,
pub(crate) memory_mib: u32,
pub(crate) vcpus: u32,
pub(crate) baker_runtime_sha16: Option<String>,
pub(crate) balloon_target_pages: Option<u32>,
pub(crate) tsi_token: Option<String>,
pub(crate) layers: Vec<PathBuf>,
pub(crate) delta_squashfs: Option<PathBuf>,
pub(crate) mounts: Vec<crate::vmm::resources::MountSpec>,
pub(crate) bundled_kernel: Option<PathBuf>,
pub(crate) hidden_pool: std::sync::OnceLock<Arc<HiddenPool>>,
pub(crate) warm_baked_worker: Arc<crate::bake::WarmStash>,
}
impl Image {
#[doc(hidden)]
pub fn _warm_handoff_present(&self) -> bool {
self.warm_baked_worker
.inner
.lock()
.map(|g| g.is_some())
.unwrap_or(false)
}
#[doc(hidden)]
pub fn _warm_handoff_pid(&self) -> Option<u32> {
self.warm_baked_worker
.inner
.lock()
.ok()
.and_then(|g| g.as_ref().map(|bw| bw.child.id()))
}
}
#[doc(hidden)]
pub struct HiddenPool {
state: Arc<Mutex<PoolState>>,
available: Arc<Condvar>,
dirty: Option<Arc<Mutex<VecDeque<Worker>>>>,
dirty_pending: Option<Arc<Condvar>>,
socks_dir: PathBuf,
shutting_down: Arc<AtomicBool>,
spawn_cfg: Arc<SpawnConfig>,
policy: PoolPolicy,
}
#[derive(Debug, Clone, Copy)]
struct PoolPolicy {
min: usize,
max: usize,
idle_timeout: Duration,
acquire_timeout: Option<Duration>,
restore_on_release: bool,
}
impl Default for PoolPolicy {
fn default() -> Self {
Self {
min: 0,
max: 64,
idle_timeout: Duration::from_secs(60),
acquire_timeout: Some(Duration::from_secs(60)),
restore_on_release: true,
}
}
}
struct PoolState {
idle: Vec<IdleEntry>,
alive: usize,
waiting: usize,
}
struct IdleEntry {
worker: Worker,
last_used: Instant,
}
struct Worker {
child: Child,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
control_path: PathBuf,
control: Arc<Mutex<ControlChannel>>,
last_restore_path: PathBuf,
}
struct SnapshotStats {
bytes_written: u64,
capture_us: u64,
save_us: u64,
}
struct ControlChannel {
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
writer: std::os::unix::net::UnixStream,
}
impl ControlChannel {
fn send_line(&mut self, line: &str) -> std::io::Result<()> {
use std::io::Write;
self.writer.write_all(line.as_bytes())?;
if !line.ends_with('\n') {
self.writer.write_all(b"\n")?;
}
self.writer.flush()
}
fn read_line(&mut self) -> std::io::Result<String> {
use std::io::BufRead;
loop {
let mut buf = String::new();
let n = self.reader.read_line(&mut buf)?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"worker control socket closed",
));
}
let trimmed = buf.trim_start();
if trimmed.starts_with("SAVE_DONE ") || trimmed.starts_with("SAVE_FAIL ") {
tracing::debug!(line = %buf.trim_end(), "skipping orthogonal bg-save notification");
continue;
}
return Ok(buf);
}
}
}
#[cfg(target_os = "macos")]
fn warm_baked_to_worker(bw: crate::bake::BakedWorker) -> Worker {
Worker {
child: bw.child,
vsock_mux_path: bw.vsock_mux_path,
vsock_exec_path: bw.vsock_exec_path,
control_path: bw.control_path,
control: Arc::new(Mutex::new(ControlChannel {
reader: std::io::BufReader::new(bw.control_reader),
writer: bw.control_writer,
})),
last_restore_path: bw.last_restore_path,
}
}
impl Worker {
fn send_restore(&self, snapshot_path: &Path) -> Result<(), Error> {
let path_str = snapshot_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("RESTORE {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if line.starts_with("DONE") {
Ok(())
} else {
Err(Error::vm_msg(format!(
"worker RESTORE: expected DONE response, got: {}",
line.trim()
)))
}
}
fn send_snapshot(&self, out_path: &Path) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("SNAPSHOT {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT: unexpected response: {}",
line.trim()
)))
}
}
#[allow(dead_code)]
fn send_snapshot_with_base(
&self,
out_path: &Path,
base_path: &Path,
) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot out path is not valid UTF-8".to_owned()))?;
let base_str = base_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot base path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("SNAPSHOT {path_str} base={base_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT (with base) failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT (with base): unexpected response: {}",
line.trim()
)))
}
}
#[allow(dead_code)]
fn send_snapshot_async(&self, out_path: &Path) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("SNAPSHOT_ASYNC {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT_ASYNC") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT_ASYNC failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT_ASYNC: unexpected response: {}",
line.trim()
)))
}
}
#[cfg_attr(not(all(target_os = "macos", target_arch = "aarch64")), allow(dead_code))]
fn send_smpark_park(&self) -> Result<bool, Error> {
let body = serde_json::json!({ "action": "smpark_park" });
match crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(Duration::from_secs(5)),
) {
Ok(_) => Ok(true),
Err(e) => {
tracing::debug!(error = %e, "smpark_park unavailable; skipping");
Ok(false)
}
}
}
#[cfg_attr(not(all(target_os = "macos", target_arch = "aarch64")), allow(dead_code))]
fn send_smpark_unpark(&self) -> Result<bool, Error> {
let body = serde_json::json!({ "action": "smpark_unpark" });
match crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(Duration::from_secs(5)),
) {
Ok(_) => Ok(true),
Err(e) => {
tracing::debug!(error = %e, "smpark_unpark unavailable; skipping");
Ok(false)
}
}
}
fn shutdown(&mut self) {
if let Ok(mut ctl) = self.control.lock() {
let _ = ctl.send_line("QUIT");
}
let deadline = Instant::now() + Duration::from_millis(100);
loop {
match self.child.try_wait() {
Ok(Some(_)) => break,
Ok(None) if Instant::now() < deadline => {
std::thread::sleep(Duration::from_millis(2));
}
_ => {
let _ = self.child.kill();
let _ = self.child.wait();
break;
}
}
}
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
let _ = std::fs::remove_file(&self.control_path);
let mut h = self.vsock_mux_path.clone();
h.set_extension("handoff");
let _ = std::fs::remove_file(&h);
}
}
struct SpawnConfig {
worker_bin: PathBuf,
snapshot_path: PathBuf,
layers: Vec<PathBuf>,
delta_squashfs: Option<PathBuf>,
mounts: Vec<crate::vmm::resources::MountSpec>,
memory_mib: u32,
vcpus: u32,
socks_dir: PathBuf,
name_prefix: String,
spawn_timeout: Duration,
baker_runtime_sha16: Option<String>,
balloon_target_pages: Option<u32>,
tsi_token: Option<String>,
}
impl SpawnConfig {
fn spawn_one(&self) -> Result<Worker, Error> {
use std::os::unix::net::UnixListener;
let suffix = unique_suffix();
let vsock_mux_path = self
.socks_dir
.join(format!("{}-{:016x}.sock", self.name_prefix, suffix));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let control_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-ctl");
p.set_file_name(name);
p
};
let _ = std::fs::remove_file(&vsock_mux_path);
let _ = std::fs::remove_file(&vsock_exec_path);
let _ = std::fs::remove_file(&control_path);
let ctl_listener = UnixListener::bind(&control_path).map_err(|e| {
Error::vm_msg(format!(
"bind control socket {}: {e}",
control_path.display()
))
})?;
if !self.snapshot_path.is_file() {
let poll_t0 = Instant::now();
let poll_deadline = poll_t0 + self.spawn_timeout;
let mut backoff = Duration::from_millis(2);
loop {
if self.snapshot_path.is_file() {
if std::env::var_os("SUPERMACHINE_TIMINGS").is_some() {
eprintln!(
"[spawn_one] waited {:?} for snapshot file to land at {}",
poll_t0.elapsed(),
self.snapshot_path.display(),
);
}
break;
}
if Instant::now() > poll_deadline {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: snapshot file {} did not appear within {:?} \
(bg save in flight from a pipelined bake — increase \
`VmConfig::restore_timeout` if your disk is slow)",
self.snapshot_path.display(),
self.spawn_timeout
)));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(Duration::from_millis(50));
}
}
let mut cmd = Command::new(&self.worker_bin);
for layer in &self.layers {
cmd.arg("--virtio-blk").arg(layer);
}
if let Some(delta) = &self.delta_squashfs {
cmd.arg("--virtio-blk").arg(delta);
}
for m in &self.mounts {
cmd.arg("--mount")
.arg(format!("{}:{}", m.host_path, m.guest_tag));
}
cmd.arg("--memory").arg(self.memory_mib.to_string());
cmd.arg("--vcpus").arg(self.vcpus.to_string());
cmd.arg("--restore-from").arg(&self.snapshot_path);
cmd.arg("--cow-restore");
cmd.arg("--vsock-mux").arg(&vsock_mux_path);
cmd.arg("--vsock-exec").arg(&vsock_exec_path);
cmd.arg("--pool-worker").arg(&control_path);
if let Some(pages) = self.balloon_target_pages {
cmd.arg("--balloon-target-pages").arg(pages.to_string());
}
if let Some(hex) = self.tsi_token.as_deref() {
cmd.arg("--tsi-token").arg(hex);
}
let log_to_stdio = std::env::var("SUPERMACHINE_WORKER_LOG")
.map(|v| v == "1" || v == "true")
.unwrap_or(false);
if !log_to_stdio {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
let __t0 = Instant::now();
let child = cmd
.spawn()
.map_err(|e| Error::vm_msg(format!("spawn worker {}: {e}", self.worker_bin.display())))?;
let __t_spawned = __t0.elapsed();
ctl_listener
.set_nonblocking(true)
.map_err(|e| Error::vm_msg(format!("set control listener nonblocking: {e}")))?;
let deadline = Instant::now() + self.spawn_timeout;
let mut backoff = Duration::from_millis(1);
let stream = loop {
match ctl_listener.accept() {
Ok((s, _)) => break s,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() > deadline {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: control connect did not arrive within {:?}",
self.spawn_timeout
)));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(Duration::from_millis(10));
}
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: control accept: {e}"
)));
}
}
};
stream
.set_nonblocking(false)
.map_err(|e| Error::vm_msg(format!("set control stream blocking: {e}")))?;
let writer = stream
.try_clone()
.map_err(|e| Error::vm_msg(format!("clone control stream: {e}")))?;
let mut control = ControlChannel {
reader: std::io::BufReader::new(stream),
writer,
};
let ready = match control.read_line() {
Ok(l) => l,
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: read READY: {e}"
)));
}
};
if ready.trim() != "READY" {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: expected READY, got: {}",
ready.trim()
)));
}
let snap_path_str = self.snapshot_path.to_string_lossy().to_string();
if let Err(e) = control.send_line(&format!("RESTORE {snap_path_str}")) {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: send initial RESTORE: {e}"
)));
}
let done = match control.read_line() {
Ok(l) => l,
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: read initial DONE: {e}"
)));
}
};
if !done.starts_with("DONE") {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: expected initial DONE, got: {}",
done.trim()
)));
}
if std::env::var_os("SUPERMACHINE_TIMINGS").is_some() {
eprintln!(
"[spawn_one] spawn={:?} accept_to_done={:?} total={:?}",
__t_spawned,
__t0.elapsed() - __t_spawned,
__t0.elapsed()
);
}
let probe_skip = self
.baker_runtime_sha16
.as_deref()
.and_then(|stored| {
current_worker_sha16(&self.worker_bin)
.map(|current| stored == current.as_str())
})
.unwrap_or(false);
if !probe_skip {
if let Err(e) = probe_agent_protocol(&vsock_exec_path, self.vcpus) {
if self.vcpus > 1 && e.is_likely_multi_vcpu_restore_stall() {
eprintln!(
"supermachine: WARNING multi-vCPU ({}vCPU) snapshot agent probe \
timed out — guest may be RCU-stalled after restore. \
Continuing (multi-vCPU is unsupported per design); \
workload exec will surface a real error if the guest is hung.",
self.vcpus
);
} else {
let _ = std::fs::remove_file(&control_path);
return Err(e);
}
}
}
Ok(Worker {
child,
vsock_mux_path,
vsock_exec_path,
control_path,
control: Arc::new(Mutex::new(control)),
last_restore_path: self.snapshot_path.clone(),
})
}
}
const HOST_AGENT_PROTOCOL_MIN: u32 = 2;
fn current_worker_sha16(worker_bin: &Path) -> Option<String> {
use std::sync::Mutex;
static CACHE: Mutex<Option<(PathBuf, u64, u128, String)>> = Mutex::new(None);
let meta = std::fs::metadata(worker_bin).ok()?;
let len = meta.len();
let mtime_ns = meta
.modified()
.ok()?
.duration_since(std::time::UNIX_EPOCH)
.ok()?
.as_nanos();
if let Ok(g) = CACHE.lock() {
if let Some((p, l, m, sha)) = g.as_ref() {
if p == worker_bin && *l == len && *m == mtime_ns {
return Some(sha.clone());
}
}
}
let out = std::process::Command::new("shasum")
.arg("-a")
.arg("256")
.arg(worker_bin)
.output()
.ok()?;
if !out.status.success() {
return None;
}
let line = String::from_utf8_lossy(&out.stdout);
let digest = line.split_whitespace().next()?.to_owned();
let sha16 = digest[..digest.len().min(16)].to_owned();
if let Ok(mut g) = CACHE.lock() {
*g = Some((worker_bin.to_path_buf(), len, mtime_ns, sha16.clone()));
}
Some(sha16)
}
fn probe_agent_protocol(vsock_exec_path: &Path, vcpus: u32) -> Result<(), Error> {
let timeout = if vcpus > 1 {
Duration::from_secs(1)
} else {
Duration::from_secs(10)
};
let body = serde_json::json!({ "action": "probe" });
let ack = match crate::exec::send_control_with_ack(
vsock_exec_path,
&body,
Some(timeout),
) {
Ok(a) => a,
Err(e) => {
return Err(Error::vm_msg(format!(
"agent in this snapshot is from an older supermachine release \
(probe failed: {e}). Rebake the snapshot to pick up the new \
agent: rm -rf the snapshot dir and re-run your bake (e.g. \
`supermachine pull <image> --name <name>`)."
)));
}
};
let proto = ack.get("protocol").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
if proto < HOST_AGENT_PROTOCOL_MIN {
return Err(Error::vm_msg(format!(
"agent in this snapshot speaks protocol v{proto} but this \
supermachine library expects v{HOST_AGENT_PROTOCOL_MIN}+. The \
snapshot was baked against a previous release; rebake to pick up \
the new agent (rm -rf the snapshot dir and re-run your bake)."
)));
}
Ok(())
}
impl std::fmt::Debug for HiddenPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = self.state.lock().ok();
f.debug_struct("HiddenPool")
.field("socks_dir", &self.socks_dir)
.field(
"alive",
&s.as_ref().map(|s| s.alive).unwrap_or(usize::MAX),
)
.field(
"idle",
&s.as_ref().map(|s| s.idle.len()).unwrap_or(usize::MAX),
)
.finish()
}
}
impl Drop for HiddenPool {
fn drop(&mut self) {
self.shutting_down.store(true, Ordering::SeqCst);
self.available.notify_all();
if let Some(c) = self.dirty_pending.as_ref() {
c.notify_all();
}
if let Ok(mut s) = self.state.lock() {
while let Some(mut e) = s.idle.pop() {
e.worker.shutdown();
s.alive = s.alive.saturating_sub(1);
}
if let Some(d) = self.dirty.as_ref() {
if let Ok(mut q) = d.lock() {
while let Some(mut w) = q.pop_front() {
w.shutdown();
s.alive = s.alive.saturating_sub(1);
}
}
}
}
let _ = std::fs::remove_dir_all(&self.socks_dir);
}
}
impl HiddenPool {
fn acquire(&self) -> Result<Worker, Error> {
let acquire_t0 = Instant::now();
let acquire_timeout = self.policy.acquire_timeout;
let mut state = self
.state
.lock()
.map_err(|_| Error::vm_msg("pool mutex poisoned".to_owned()))?;
loop {
while let Some(mut entry) = state.idle.pop() {
match entry.worker.child.try_wait() {
Ok(None) => return Ok(entry.worker), Ok(Some(_status)) => {
state.alive = state.alive.saturating_sub(1);
continue; }
Err(_) => {
state.alive = state.alive.saturating_sub(1);
continue;
}
}
}
if self.shutting_down.load(Ordering::SeqCst) {
return Err(Error::vm_msg("pool is shutting down".to_owned()));
}
if state.alive < self.policy.max {
state.alive += 1;
drop(state);
let spawned = self.spawn_cfg.spawn_one();
match spawned {
Ok(w) => return Ok(w),
Err(e) => {
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
return Err(e);
}
}
}
if let Some(total) = acquire_timeout {
if acquire_t0.elapsed() >= total {
return Err(Error::pool_exhausted(format!(
"acquire timed out after {total:?}; pool at max ({})",
self.policy.max
)));
}
}
state.waiting += 1;
let (new_state, timed_out) = match acquire_timeout {
None => match self.available.wait(state) {
Ok(s) => (s, false),
Err(_) => {
return Err(Error::vm_msg("pool condvar poisoned".to_owned()))
}
},
Some(total) => {
let remaining = total.saturating_sub(acquire_t0.elapsed());
match self.available.wait_timeout(state, remaining) {
Ok((s, r)) => (s, r.timed_out()),
Err(_) => {
return Err(Error::vm_msg(
"pool condvar poisoned".to_owned(),
))
}
}
}
};
state = new_state;
state.waiting = state.waiting.saturating_sub(1);
if timed_out {
return Err(Error::pool_exhausted(format!(
"acquire timed out after {:?}; pool at max ({})",
acquire_timeout.unwrap_or_default(),
self.policy.max
)));
}
}
}
fn release(&self, worker: Worker) {
if !self.policy.restore_on_release {
if let Ok(mut s) = self.state.lock() {
s.idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
self.available.notify_all();
}
return;
}
if let Some(d) = self.dirty.as_ref() {
if let Ok(mut q) = d.lock() {
q.push_back(worker);
}
if let Some(c) = self.dirty_pending.as_ref() {
c.notify_all();
}
} else {
let mut w = worker;
w.shutdown();
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
}
}
}
impl Image {
pub fn from_snapshot(path: impl Into<PathBuf>) -> Result<Self, Error> {
Self::from_snapshot_inner(path.into(), false)
}
pub(crate) fn from_snapshot_pending(path: impl Into<PathBuf>) -> Result<Self, Error> {
Self::from_snapshot_inner(path.into(), true)
}
fn from_snapshot_inner(path: PathBuf, allow_pending: bool) -> Result<Self, Error> {
let (snapshot_path, metadata_path) = if path.is_dir() {
(path.join("restore.snap"), path.join("metadata.json"))
} else if path.is_file() {
let parent = path.parent().ok_or_else(|| {
Error::image_msg(format!("snapshot path has no parent dir: {}", path.display()))
})?;
(path.clone(), parent.join("metadata.json"))
} else if allow_pending {
return Err(Error::image_msg(format!(
"snapshot path not found: {}",
path.display()
)));
} else {
return Err(Error::image_msg(format!(
"snapshot path not found: {}",
path.display()
)));
};
if !allow_pending && !snapshot_path.is_file() {
return Err(Error::image_msg(format!(
"snapshot file not found: {}",
snapshot_path.display()
)));
}
if !metadata_path.is_file() {
return Err(Error::image_msg(format!(
"metadata.json not found alongside snapshot at {}",
metadata_path.display()
)));
}
let meta_text = std::fs::read_to_string(&metadata_path)
.map_err(|e| Error::image_msg(format!("read {}: {e}", metadata_path.display())))?;
let meta: serde_json::Value = serde_json::from_str(&meta_text)
.map_err(|e| Error::image_msg(format!("parse {}: {e}", metadata_path.display())))?;
warn_if_snapshot_version_mismatch(&meta, &snapshot_path);
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(256);
let vcpus = meta
.get("vcpus")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(1);
let metadata_dir = metadata_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let resolve_path = |s: &str| -> PathBuf {
let p = PathBuf::from(s);
if p.is_absolute() {
p
} else {
metadata_dir.join(p)
}
};
let layers: Vec<PathBuf> = meta
.get("layers")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(&resolve_path))
.collect()
})
.unwrap_or_default();
let delta_squashfs = meta
.get("delta_squashfs")
.and_then(|v| v.as_str())
.map(&resolve_path);
let mounts: Vec<crate::vmm::resources::MountSpec> = meta
.get("mounts")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| {
let host = x.get("host_path").and_then(|v| v.as_str())?;
let tag = x.get("guest_tag").and_then(|v| v.as_str())?;
let policy = x
.get("symlinks")
.and_then(|v| v.as_str())
.and_then(parse_symlink_policy)
.unwrap_or_default();
Some(
crate::vmm::resources::MountSpec::new(host, tag)
.with_symlinks(policy),
)
})
.collect()
})
.unwrap_or_default();
let bundled_kernel = {
let cand = metadata_dir.join("kernel");
if cand.is_file() {
Some(cand)
} else {
None
}
};
let baker_runtime_sha16 = meta
.get("runtime_sha16")
.and_then(|v| v.as_str())
.map(|s| s.to_owned());
let balloon_target_pages = meta
.get("balloon_target_pages")
.and_then(|v| v.as_u64())
.and_then(|n| u32::try_from(n).ok())
.filter(|n| *n > 0);
let tsi_token = meta
.get("tsi_token")
.and_then(|v| v.as_str())
.filter(|s| s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()))
.map(|s| s.to_ascii_lowercase());
Ok(Self {
snapshot_path,
memory_mib,
vcpus,
baker_runtime_sha16,
balloon_target_pages,
tsi_token,
layers,
delta_squashfs,
mounts,
bundled_kernel,
hidden_pool: std::sync::OnceLock::new(),
warm_baked_worker: Arc::new(crate::bake::WarmStash::new(None)),
})
}
pub fn from_oci(image_ref: &str) -> Result<Self, Error> {
Self::from_oci_with_policy(image_ref, PullPolicy::default())
}
pub fn from_oci_with_policy(
image_ref: &str,
policy: PullPolicy,
) -> Result<Self, Error> {
let snapshots_dir = default_snapshots_dir();
Self::from_oci_to_dir(image_ref, policy, &snapshots_dir, None)
}
pub fn from_oci_to_dir(
image_ref: &str,
policy: PullPolicy,
snapshots_dir: &Path,
name: Option<&str>,
) -> Result<Self, Error> {
let derived = name
.map(|s| s.to_owned())
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(image_ref));
let snap_dir = snapshots_dir.join(&derived);
let cache_loadable = Self::from_snapshot(&snap_dir).is_ok();
match policy {
PullPolicy::Never => {
if cache_loadable {
return Self::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {image_ref} at {} (PullPolicy::Never)",
snap_dir.display()
)));
}
PullPolicy::Missing if cache_loadable => {
return Self::from_snapshot(&snap_dir);
}
_ => {}
}
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: image_ref.to_owned(),
name: name.map(|s| s.to_owned()),
runtime: "supermachine".to_owned(),
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.to_path_buf(),
cmd_override: None,
extra_args: Vec::new(),
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root)
.map_err(|e| map_bake_error(&request.image, e))?;
Self::from_snapshot(&snap_dir)
}
pub fn builder(image_ref: impl Into<String>) -> OciImageBuilder {
OciImageBuilder::new(image_ref)
}
pub fn ensure_baked<F>(
name: impl Into<String>,
image_ref: impl Into<String>,
configure: F,
) -> Result<Image, Error>
where
F: FnOnce(OciImageBuilder) -> OciImageBuilder,
{
let builder = configure(
OciImageBuilder::new(image_ref).with_name(name),
);
builder.build()
}
pub fn snapshot_path(&self) -> &Path {
&self.snapshot_path
}
pub fn memory_mib(&self) -> u32 {
self.memory_mib
}
pub fn vcpus(&self) -> u32 {
self.vcpus
}
#[doc(hidden)]
pub fn balloon_target_pages_for_test(&self) -> Option<u32> {
self.balloon_target_pages
}
pub fn start(&self, config: &VmConfig) -> Result<Vm, Error> {
Vm::start(self, config)
}
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
self.acquire_with(&VmConfig::new())
}
pub fn acquire_with(&self, config: &VmConfig) -> Result<PooledVm<'_>, Error> {
let _span = tracing::info_span!(
"supermachine.acquire",
memory_mib = self.memory_mib,
vcpus = self.vcpus,
)
.entered();
let pool_arc = self.ensure_default_pool(config)?;
let worker = pool_arc.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: Some(Arc::new(ImageMeta {
memory_mib: config.memory_mib.unwrap_or(self.memory_mib),
vcpus: config.vcpus.unwrap_or(self.vcpus),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
})),
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(pool_arc),
_image: std::marker::PhantomData,
})
}
pub fn pool(&self) -> PoolBuilder<'_> {
PoolBuilder {
image: self,
policy: PoolPolicy::default(),
vm_config: VmConfig::new(),
}
}
fn build_pool_arc(
&self,
config: &VmConfig,
policy: PoolPolicy,
) -> Result<Arc<HiddenPool>, Error> {
#[cfg(target_os = "macos")]
let worker_bin = crate::codesign::locate_worker_bin().ok_or_else(|| {
Error::assets_msg(
"supermachine-worker binary not found (looked for sibling of \
current_exe and target/release/supermachine-worker). Set \
SUPERMACHINE_WORKER_BIN if you have it elsewhere."
.to_owned(),
)
})?;
#[cfg(not(target_os = "macos"))]
let worker_bin: PathBuf = std::env::var_os("SUPERMACHINE_WORKER_BIN")
.map(PathBuf::from)
.ok_or_else(|| {
Error::assets_msg(
"SUPERMACHINE_WORKER_BIN must be set on this platform".to_owned(),
)
})?;
#[cfg(target_os = "macos")]
{
crate::codesign::verify_worker_version(&worker_bin)
.map_err(Error::vm_msg)?;
let _ = crate::codesign::ensure_worker_signed(&worker_bin);
}
let socks_dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => PathBuf::from(format!(
"/tmp/sm-pool-{}-{:x}",
std::process::id(),
unique_suffix(),
)),
};
std::fs::create_dir_all(&socks_dir).map_err(Error::Io)?;
let memory_mib = config.memory_mib.unwrap_or(self.memory_mib);
let vcpus = config.vcpus.unwrap_or(self.vcpus);
let spawn_timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(30));
let name_prefix = "w".to_owned();
let spawn_cfg = Arc::new(SpawnConfig {
worker_bin,
snapshot_path: self.snapshot_path.clone(),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
mounts: self.mounts.clone(),
memory_mib,
vcpus,
socks_dir: socks_dir.clone(),
name_prefix,
spawn_timeout,
baker_runtime_sha16: self.baker_runtime_sha16.clone(),
balloon_target_pages: self.balloon_target_pages,
tsi_token: self.tsi_token.clone(),
});
let claimed_warm = self
.warm_baked_worker
.take()
.and_then(|mut bw| {
match bw.child.try_wait() {
Ok(None) => Some(bw), Ok(Some(_)) => None, Err(_) => None, }
});
let initial = policy.min;
let mut idle: Vec<IdleEntry> = Vec::with_capacity(initial.max(1));
let extra_warm: usize;
if let Some(bw) = claimed_warm {
idle.push(IdleEntry {
worker: warm_baked_to_worker(bw),
last_used: Instant::now(),
});
extra_warm = 1;
} else {
extra_warm = 0;
}
let to_spawn = initial.saturating_sub(extra_warm);
if to_spawn == 1 {
idle.push(IdleEntry {
worker: spawn_cfg.spawn_one()?,
last_used: Instant::now(),
});
} else if to_spawn > 1 {
let mut handles = Vec::with_capacity(to_spawn);
for _ in 0..to_spawn {
let cfg = Arc::clone(&spawn_cfg);
handles.push(std::thread::spawn(move || cfg.spawn_one()));
}
for h in handles {
let w = h
.join()
.map_err(|_| Error::vm_msg("pool spawn thread panicked".to_owned()))??;
idle.push(IdleEntry {
worker: w,
last_used: Instant::now(),
});
}
}
let initial = idle.len();
let pool = Arc::new(HiddenPool {
state: Arc::new(Mutex::new(PoolState {
idle,
alive: initial,
waiting: 0,
})),
available: Arc::new(Condvar::new()),
dirty: Some(Arc::new(Mutex::new(VecDeque::new()))),
dirty_pending: Some(Arc::new(Condvar::new())),
socks_dir,
shutting_down: Arc::new(AtomicBool::new(false)),
spawn_cfg: Arc::clone(&spawn_cfg),
policy,
});
let wait_handles = pool.wait_handles();
let h_replenish = {
let h = wait_handles.clone();
std::thread::Builder::new()
.name("supermachine-pool-replenish".into())
.spawn(move || replenisher_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn replenisher thread: {e}")))?
};
let mut handles = vec![h_replenish];
if policy.restore_on_release {
let restorer_count = ((policy.max + 1) / 2)
.clamp(1, 4)
.min(policy.max.max(1));
for _ in 0..restorer_count {
let h = wait_handles.clone();
let h_restore = std::thread::Builder::new()
.name("supermachine-pool-restore".into())
.spawn(move || restorer_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn restorer thread: {e}")))?;
handles.push(h_restore);
}
}
if pool.policy.idle_timeout != Duration::MAX {
let h = wait_handles.clone();
let h_janitor = std::thread::Builder::new()
.name("supermachine-pool-janitor".into())
.spawn(move || janitor_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn janitor thread: {e}")))?;
handles.push(h_janitor);
}
drop(handles);
Ok(pool)
}
fn ensure_default_pool(
&self,
config: &VmConfig,
) -> Result<&Arc<HiddenPool>, Error> {
if let Some(p) = self.hidden_pool.get() {
return Ok(p);
}
let pool = self.build_pool_arc(config, PoolPolicy::default())?;
let _ = self.hidden_pool.set(pool);
Ok(self
.hidden_pool
.get()
.expect("hidden pool just initialized"))
}
}
#[derive(Clone)]
struct PoolWaitHandles {
state: Arc<Mutex<PoolState>>,
available: Arc<Condvar>,
dirty: Option<Arc<Mutex<VecDeque<Worker>>>>,
dirty_pending: Option<Arc<Condvar>>,
shutting_down: Arc<AtomicBool>,
spawn_cfg: Arc<SpawnConfig>,
policy: PoolPolicy,
}
impl HiddenPool {
fn wait_handles(&self) -> PoolWaitHandles {
PoolWaitHandles {
state: Arc::clone(&self.state),
available: Arc::clone(&self.available),
dirty: self.dirty.as_ref().map(Arc::clone),
dirty_pending: self.dirty_pending.as_ref().map(Arc::clone),
shutting_down: Arc::clone(&self.shutting_down),
spawn_cfg: Arc::clone(&self.spawn_cfg),
policy: self.policy,
}
}
}
fn restorer_loop(h: PoolWaitHandles) {
let (Some(dirty), Some(pending)) = (h.dirty.as_ref(), h.dirty_pending.as_ref()) else {
return;
};
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let mut worker = {
let mut q = match dirty.lock() {
Ok(q) => q,
Err(_) => return,
};
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
if let Some(w) = q.pop_front() {
break w;
}
q = match pending.wait_timeout(q, Duration::from_millis(100)) {
Ok((g, _)) => g,
Err(_) => return,
};
}
};
let snap_path = h.spawn_cfg.snapshot_path.clone();
match worker.send_restore(&snap_path) {
Ok(()) => {
if let Ok(mut s) = h.state.lock() {
s.idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
h.available.notify_all();
}
}
Err(_) => {
worker.shutdown();
if let Ok(mut s) = h.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
h.available.notify_all();
}
}
}
}
fn replenisher_loop(h: PoolWaitHandles) {
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let need_more = {
let s = match h.state.lock() {
Ok(s) => s,
Err(_) => return,
};
s.alive < h.policy.min
};
if !need_more {
let s = match h.state.lock() {
Ok(s) => s,
Err(_) => return,
};
let _ = h
.available
.wait_timeout(s, Duration::from_millis(100));
continue;
}
if let Ok(mut s) = h.state.lock() {
if s.alive >= h.policy.min {
continue;
}
s.alive += 1;
}
match h.spawn_cfg.spawn_one() {
Ok(w) => {
if let Ok(mut s) = h.state.lock() {
s.idle.push(IdleEntry {
worker: w,
last_used: Instant::now(),
});
h.available.notify_all();
}
}
Err(_) => {
if let Ok(mut s) = h.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
std::thread::sleep(Duration::from_millis(500));
}
}
}
}
fn janitor_loop(h: PoolWaitHandles) {
let timeout = h.policy.idle_timeout;
let min = h.policy.min;
if timeout == Duration::MAX {
return;
}
let tick = (timeout / 4).max(Duration::from_millis(100));
let wait_unit = Duration::from_millis(100).min(tick);
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let mut to_evict: Vec<Worker> = Vec::new();
if let Ok(mut s) = h.state.lock() {
let now = Instant::now();
while s.alive > min && !s.idle.is_empty() {
let oldest = &s.idle[0];
if now.duration_since(oldest.last_used) < timeout {
break;
}
let entry = s.idle.remove(0);
s.alive -= 1;
to_evict.push(entry.worker);
}
}
for mut w in to_evict {
w.shutdown();
}
let mut remaining = tick;
while remaining > Duration::ZERO && !h.shutting_down.load(Ordering::SeqCst) {
let chunk = remaining.min(wait_unit);
if let Ok(s) = h.state.lock() {
let _ = h.available.wait_timeout(s, chunk);
}
remaining = remaining.saturating_sub(chunk);
}
}
}
pub struct PooledVm<'a> {
vm: Option<Vm>,
worker: Option<Worker>,
pool_arc: Arc<HiddenPool>,
_image: std::marker::PhantomData<&'a Image>,
}
impl<'a> std::ops::Deref for PooledVm<'a> {
type Target = Vm;
fn deref(&self) -> &Vm {
self.vm.as_ref().expect("PooledVm used after drop")
}
}
impl<'a> std::ops::DerefMut for PooledVm<'a> {
fn deref_mut(&mut self) -> &mut Vm {
self.vm.as_mut().expect("PooledVm used after drop")
}
}
impl<'a> PooledVm<'a> {
pub fn snapshot(&self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let _span = tracing::info_span!(
"supermachine.snapshot",
dest_dir = %dest_dir.display(),
)
.entered();
let worker = self
.worker
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no worker (already dropped?)".to_owned()))?;
let vm = self
.vm
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no vm (already dropped?)".to_owned()))?;
let meta = vm.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"PooledVm::snapshot: image metadata missing (acquire from a 0.3.8+ Image)"
.to_owned(),
)
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let parked = if meta.vcpus > 1 {
worker.send_smpark_park()?
} else {
false
};
let snap_result = if std::env::var_os("SUPERMACHINE_DIFF_CYCLE_SNAPSHOT").is_some() {
worker.send_snapshot_with_base(&snap_path, &worker.last_restore_path)
} else {
worker.send_snapshot(&snap_path)
};
if parked {
let _ = worker.send_smpark_unpark()?;
}
let _stats = snap_result?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "PooledVm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Image::from_snapshot(&dest_dir)
}
}
impl<'a> Drop for PooledVm<'a> {
fn drop(&mut self) {
let _ = self.vm.take();
if let Some(worker) = self.worker.take() {
self.pool_arc.release(worker);
}
}
}
pub struct PoolBuilder<'a> {
image: &'a Image,
policy: PoolPolicy,
vm_config: VmConfig,
}
impl<'a> PoolBuilder<'a> {
pub fn min(mut self, n: usize) -> Self {
self.policy.min = n;
self
}
pub fn max(mut self, n: usize) -> Self {
self.policy.max = n.max(1);
self
}
pub fn idle_timeout(mut self, d: Duration) -> Self {
self.policy.idle_timeout = d;
self
}
pub fn acquire_timeout(mut self, d: Duration) -> Self {
self.policy.acquire_timeout = Some(d);
self
}
pub fn no_acquire_timeout(mut self) -> Self {
self.policy.acquire_timeout = None;
self
}
pub fn restore_on_release(mut self, on: bool) -> Self {
self.policy.restore_on_release = on;
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.vm_config = std::mem::take(&mut self.vm_config).with_restore_timeout(timeout);
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.vm_config = std::mem::take(&mut self.vm_config).with_memory_mib(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vm_config = std::mem::take(&mut self.vm_config).with_vcpus(vcpus);
self
}
pub fn build(self) -> Result<Pool, Error> {
let image = self.image;
let mut policy = self.policy;
if policy.min > policy.max {
policy.min = policy.max;
}
let arc = image.build_pool_arc(&self.vm_config, policy)?;
match image.hidden_pool.set(Arc::clone(&arc)) {
Ok(()) => Ok(Pool { inner: arc }),
Err(_) => Err(Error::vm_msg(
"image.pool().build() must be called before image.acquire() / \
image.acquire_with(). The Image already has a default pool \
from an earlier acquire — its policy can't be changed in \
place. Either call pool().build() first, or load the Image \
fresh: `Image::from_snapshot(path)?.pool()...build()?`."
.to_owned(),
)),
}
}
}
#[derive(Clone)]
pub struct Pool {
inner: Arc<HiddenPool>,
}
#[derive(Debug, Clone, Copy)]
pub struct PoolStats {
pub alive: usize,
pub in_use: usize,
pub idle: usize,
pub waiting: usize,
pub max: usize,
pub min: usize,
}
impl Pool {
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
let _span = tracing::info_span!(
"supermachine.pool.acquire",
memory_mib = self.inner.spawn_cfg.memory_mib,
vcpus = self.inner.spawn_cfg.vcpus,
)
.entered();
let worker = self.inner.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: Some(Arc::new(ImageMeta {
memory_mib: self.inner.spawn_cfg.memory_mib,
vcpus: self.inner.spawn_cfg.vcpus,
layers: self.inner.spawn_cfg.layers.clone(),
delta_squashfs: self.inner.spawn_cfg.delta_squashfs.clone(),
})),
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(&self.inner),
_image: std::marker::PhantomData,
})
}
pub fn stats(&self) -> PoolStats {
let s = self.inner.state.lock().ok();
let alive = s.as_ref().map(|s| s.alive).unwrap_or(0);
let idle = s.as_ref().map(|s| s.idle.len()).unwrap_or(0);
let waiting = s.as_ref().map(|s| s.waiting).unwrap_or(0);
PoolStats {
alive,
in_use: alive.saturating_sub(idle),
idle,
waiting,
max: self.inner.policy.max,
min: self.inner.policy.min,
}
}
}
pub struct OciImageBuilder {
image: String,
name: Option<String>,
pull_policy: PullPolicy,
memory_mib: Option<u32>,
vcpus: Option<u32>,
guest_port: Option<u16>,
cmd: Option<Vec<String>>,
envs: Vec<(String, String)>,
snapshots_dir: Option<PathBuf>,
warmup: Option<Box<dyn FnOnce(&Vm) -> Result<(), Error> + Send>>,
warmup_tag: Option<String>,
extra_files: Vec<(PathBuf, String)>,
mounts: Vec<(PathBuf, String, crate::vmm::resources::SymlinkPolicy)>,
volumes: Vec<(PathBuf, String, u64)>,
extra_kernel_args: Vec<String>,
require_listener: Option<bool>,
}
impl OciImageBuilder {
pub fn new(image_ref: impl Into<String>) -> Self {
Self {
image: image_ref.into(),
name: None,
pull_policy: PullPolicy::default(),
memory_mib: None,
vcpus: None,
guest_port: None,
cmd: None,
envs: Vec::new(),
snapshots_dir: None,
warmup: None,
warmup_tag: None,
extra_files: Vec::new(),
mounts: Vec::new(),
volumes: Vec::new(),
extra_kernel_args: Vec::new(),
require_listener: None,
}
}
pub fn with_extra_file(
mut self,
host_path: impl Into<PathBuf>,
guest_path: impl Into<String>,
) -> Self {
self.extra_files.push((host_path.into(), guest_path.into()));
self
}
pub fn with_oci_archive(mut self, archive_path: impl Into<PathBuf>) -> Self {
let path: PathBuf = archive_path.into();
let original = std::mem::replace(
&mut self.image,
format!("oci-archive:{}", path.display()),
);
if self.name.is_none() {
self.name = Some(crate::bake::snapshot_name_for_image(&original));
}
self
}
pub fn with_oci_layout(mut self, layout_dir: impl Into<PathBuf>) -> Self {
let path: PathBuf = layout_dir.into();
let original = std::mem::replace(
&mut self.image,
format!("oci-layout:{}", path.display()),
);
if self.name.is_none() {
self.name = Some(crate::bake::snapshot_name_for_image(&original));
}
self
}
pub fn with_mount(
mut self,
host_path: impl Into<PathBuf>,
guest_tag: impl Into<String>,
) -> Self {
self.mounts.push((
host_path.into(),
guest_tag.into(),
crate::vmm::resources::SymlinkPolicy::default(),
));
self
}
pub fn with_mount_symlinks(
mut self,
host_path: impl Into<PathBuf>,
guest_tag: impl Into<String>,
symlinks: crate::vmm::resources::SymlinkPolicy,
) -> Self {
self.mounts.push((host_path.into(), guest_tag.into(), symlinks));
self
}
pub fn with_volume(mut self, spec: crate::vmm::resources::VolumeSpec) -> Self {
self.volumes.push((
PathBuf::from(spec.host_path),
spec.guest_path,
spec.size_bytes,
));
self
}
pub fn with_extra_kernel_arg(mut self, arg: impl Into<String>) -> Self {
self.extra_kernel_args.push(arg.into());
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_pull_policy(mut self, policy: PullPolicy) -> Self {
self.pull_policy = policy;
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_guest_port(mut self, port: u16) -> Self {
self.guest_port = Some(port);
self
}
pub fn with_cmd<I, S>(mut self, cmd: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.cmd = Some(cmd.into_iter().map(Into::into).collect());
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.envs.push((key.into(), value.into()));
self
}
pub fn with_snapshots_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.snapshots_dir = Some(dir.into());
self
}
pub fn with_warmup<F>(mut self, warmup: F) -> Self
where
F: FnOnce(&Vm) -> Result<(), Error> + Send + 'static,
{
self.warmup = Some(Box::new(warmup));
self
}
pub fn with_warmup_tag(mut self, tag: impl Into<String>) -> Self {
self.warmup_tag = Some(tag.into());
self
}
pub fn with_listener_required(mut self) -> Self {
self.require_listener = Some(true);
self
}
pub fn build(self) -> Result<Image, Error> {
let snapshots_dir = self
.snapshots_dir
.unwrap_or_else(default_snapshots_dir);
let derived_name = self
.name
.clone()
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(&self.image));
let snap_dir = snapshots_dir.join(&derived_name);
let cache_loadable = Image::from_snapshot(&snap_dir).is_ok();
match self.pull_policy {
PullPolicy::Never => {
if cache_loadable {
return Image::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {} at {} (PullPolicy::Never)",
self.image,
snap_dir.display()
)));
}
_ => {}
}
let mut extra_args: Vec<String> = Vec::new();
for (k, v) in &self.envs {
extra_args.push("--env".to_owned());
extra_args.push(format!("{k}={v}"));
}
for (host, guest) in &self.extra_files {
extra_args.push("--extra-file".to_owned());
extra_args.push(format!("{}:{}", host.display(), guest));
}
for (host, tag, symlinks) in &self.mounts {
extra_args.push("--mount".to_owned());
use crate::vmm::resources::SymlinkPolicy;
let s = match symlinks {
SymlinkPolicy::Opaque => format!("{}:{}", host.display(), tag),
SymlinkPolicy::Deny => format!("{}:{}:deny", host.display(), tag),
SymlinkPolicy::Follow => format!("{}:{}:follow", host.display(), tag),
};
extra_args.push(s);
}
for (host, guest, size_bytes) in &self.volumes {
extra_args.push("--volume".to_owned());
extra_args.push(format!("{}:{}:{}", host.display(), guest, size_bytes));
}
for token in &self.extra_kernel_args {
extra_args.push("--cmdline-extra".to_owned());
extra_args.push(token.clone());
}
if self.require_listener.unwrap_or(false) {
extra_args.push("--supermachine-listener-required".to_owned());
}
let cmd_override = match &self.cmd {
Some(argv) => Some(
serde_json::to_string(argv)
.map_err(|e| Error::bake_msg(format!("encode cmd: {e}")))?,
),
None => None,
};
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: self.image.clone(),
name: self.name.clone(),
runtime: "supermachine".to_owned(),
guest_port: self.guest_port.unwrap_or(80),
memory_mib: self.memory_mib.unwrap_or(256),
vcpus: self.vcpus.unwrap_or(1),
pull_policy: self.pull_policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.clone(),
cmd_override,
extra_args,
};
if self.warmup.is_none() {
let trace = std::env::var_os("SUPERMACHINE_BAKE_TRACE").is_some();
let bake_t0 = std::time::Instant::now();
let use_pre_exec = !self.require_listener.unwrap_or(false);
let pipelined = crate::bake::PipelinedWarmup {
warm_dir: snapshots_dir.join(format!("{}__warm__unused", derived_name)),
warm_tag: "unused".to_owned(),
keep_alive: true,
skip_warm_snapshot: true,
use_pre_exec_trigger: use_pre_exec,
callback: Box::new(|_ctx| Ok(())),
};
match crate::bake::run_push_pipelined(&request, bake_t0, &root, pipelined) {
Ok(warm_handoff) => {
if trace {
eprintln!(
"[bake-trace] always-pipelined (skip-warm) total: {:?} (bg save \
may still be in flight)",
bake_t0.elapsed()
);
}
let img = Image::from_snapshot_pending(&snap_dir)?;
if let Some(bw) = warm_handoff {
*img.warm_baked_worker.inner.lock().unwrap() = Some(bw);
}
return Ok(img);
}
Err(msg) => {
return Err(map_bake_error(&request.image, msg));
}
}
}
let warmup = self.warmup.unwrap();
let tag = self.warmup_tag.as_deref().unwrap_or("default");
let warm_dir = snapshots_dir.join(format!("{}__warm__{}", derived_name, tag));
if let Ok(image) = Image::from_snapshot(&warm_dir) {
return Ok(image);
}
let trace = std::env::var_os("SUPERMACHINE_BAKE_TRACE").is_some();
let bake_t0 = std::time::Instant::now();
let warmup_err: std::sync::Arc<std::sync::Mutex<Option<Error>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let warmup_err_inner = warmup_err.clone();
let warmup_t0_capture: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let warmup_t0_inner = warmup_t0_capture.clone();
let pipelined = crate::bake::PipelinedWarmup {
warm_dir: warm_dir.clone(),
warm_tag: tag.to_owned(),
keep_alive: true,
skip_warm_snapshot: false,
use_pre_exec_trigger: !self.require_listener.unwrap_or(false),
callback: Box::new(move |ctx| {
if let Ok(mut g) = warmup_t0_inner.lock() {
*g = Some(std::time::Instant::now());
}
let synth_vm = Vm {
pool: None,
vsock_mux_path: ctx.vsock_mux_path.clone(),
vsock_exec_path: ctx.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: None,
};
let result = warmup(&synth_vm);
drop(synth_vm);
match result {
Ok(()) => Ok(()),
Err(e) => {
let msg = e.to_string();
if let Ok(mut g) = warmup_err_inner.lock() {
*g = Some(e);
}
Err(msg)
}
}
}),
};
match crate::bake::run_push_pipelined(&request, bake_t0, &root, pipelined) {
Ok(warm_handoff) => {
if trace {
eprintln!(
"[bake-trace] pipelined bake total: {:?}",
bake_t0.elapsed()
);
if let Some(t0) = warmup_t0_capture.lock().ok().and_then(|g| *g) {
eprintln!("[bake-trace] (warmup ran at +{:?})", t0 - bake_t0);
}
}
let img = Image::from_snapshot(&warm_dir)?;
if let Some(bw) = warm_handoff {
*img.warm_baked_worker.inner.lock().unwrap() = Some(bw);
}
Ok(img)
}
Err(msg) => {
if let Some(e) = warmup_err.lock().ok().and_then(|mut g| g.take()) {
return Err(e);
}
Err(map_bake_error(&request.image, msg))
}
}
}
}
fn warn_if_snapshot_version_mismatch(
meta: &serde_json::Value,
snapshot_path: &Path,
) {
let current = env!("CARGO_PKG_VERSION");
let baked_version = meta
.get("baked_by_version")
.and_then(|v| v.as_str())
.map(|s| s.to_owned())
.or_else(|| {
meta.get("kernel")
.and_then(|v| v.as_str())
.and_then(parse_version_segment)
});
if let Some(baked) = baked_version {
if baked == current {
return;
}
eprintln!(
"supermachine: warning: snapshot at {} was baked under \
v{baked}; current binaries are v{current}. Re-bake \
(delete the snapshot dir and rerun `supermachine run`) \
to pick up init/kernel fixes shipped since v{baked}.",
snapshot_path.display()
);
}
}
fn parse_version_segment(kernel_path: &str) -> Option<String> {
let p = Path::new(kernel_path);
for comp in p.components() {
let Some(s) = comp.as_os_str().to_str() else {
continue;
};
let Some(rest) = s.strip_prefix('v') else {
continue;
};
if rest.matches('.').count() < 2 {
continue;
}
if !rest.split('.').all(|seg| seg.chars().all(|c| c.is_ascii_digit())) {
continue;
}
return Some(rest.to_owned());
}
None
}
fn parse_symlink_policy(s: &str) -> Option<crate::vmm::resources::SymlinkPolicy> {
use crate::vmm::resources::SymlinkPolicy;
match s {
"deny" => Some(SymlinkPolicy::Deny),
"opaque" => Some(SymlinkPolicy::Opaque),
"follow" => Some(SymlinkPolicy::Follow),
_ => None,
}
}
fn default_snapshots_dir() -> PathBuf {
if let Some(d) = std::env::var_os("SUPERMACHINE_SNAPSHOTS") {
return PathBuf::from(d);
}
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
home.join(".local/supermachine-snapshots")
}
fn repo_root_for_bake() -> Result<PathBuf, Error> {
if let Some(root) = std::env::var_os("SUPERMACHINE_ROOT") {
return Ok(PathBuf::from(root));
}
let exe = std::env::current_exe()
.map_err(|e| Error::bake_msg(format!("current_exe: {e}")))?;
for ancestor in exe.ancestors() {
if ancestor.join("tools/supermachine-push").is_file() {
return Ok(ancestor.to_path_buf());
}
if ancestor.join("share/supermachine/kernel").is_file() {
return Ok(ancestor.to_path_buf());
}
}
std::env::current_dir().map_err(|e| Error::bake_msg(format!("current_dir: {e}")))
}
fn map_bake_error(image: &str, msg: String) -> Error {
if let Some(rest) = msg.strip_prefix("KERNEL_PANIC|") {
let mut parts = rest.splitn(2, '|');
let first_line = parts.next().unwrap_or("").to_owned();
let stack: Vec<String> = parts
.next()
.map(|s| s.split('\x1F').map(ToOwned::to_owned).collect())
.unwrap_or_default();
return Error::kernel_panic(first_line, stack);
}
let lc = msg.to_ascii_lowercase();
let has_status = |code: u16| -> bool {
let needle = format!("http {code}");
lc.contains(&needle)
};
if has_status(404)
|| lc.contains("manifest unknown")
|| lc.contains("name unknown")
|| lc.contains("not found")
&& (lc.contains("registry") || lc.contains("manifest") || lc.contains("image"))
{
return Error::image_not_found(image, msg);
}
if has_status(401) || has_status(403)
|| lc.contains("unauthorized")
|| lc.contains("forbidden")
|| lc.contains("auth challenge")
{
return Error::registry_auth(image, msg);
}
if lc.contains("could not resolve")
|| lc.contains("dns")
|| lc.contains("connection refused")
|| lc.contains("connection reset")
|| lc.contains("network is unreachable")
|| lc.contains("ssl_connect")
|| lc.contains("tls handshake")
|| lc.contains("curl: (6)") || lc.contains("curl: (7)") || lc.contains("curl: (28)") || lc.contains("curl: (35)") || lc.contains("curl: (56)") {
return Error::registry_unreachable(msg);
}
if lc.contains("registry") || lc.contains("manifest") || lc.contains("docker pull") {
Error::network_msg(msg)
} else {
Error::bake_msg(msg)
}
}
#[derive(Debug, Clone, Default)]
pub struct VmConfig {
memory_mib: Option<u32>,
vcpus: Option<u32>,
assets: Option<AssetPaths>,
vsock_mux_dir: Option<PathBuf>,
restore_timeout: Option<Duration>,
}
impl VmConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_assets(mut self, assets: AssetPaths) -> Self {
self.assets = Some(assets);
self
}
pub fn with_vsock_mux_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.vsock_mux_dir = Some(dir.into());
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.restore_timeout = Some(timeout);
self
}
}
pub struct Vm {
pool: Option<WarmPool>,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
own_vsock_mux_dir: Option<PathBuf>,
skip_cleanup: bool,
image_meta: Option<Arc<ImageMeta>>,
}
#[derive(Clone, Debug)]
pub(crate) struct ImageMeta {
pub memory_mib: u32,
pub vcpus: u32,
pub layers: Vec<PathBuf>,
pub delta_squashfs: Option<PathBuf>,
}
impl Vm {
pub fn start(image: &Image, config: &VmConfig) -> Result<Vm, Error> {
#[cfg(target_os = "macos")]
if let Err(msg) = crate::codesign::check_self_has_hvf_entitlement() {
return Err(Error::vm_msg(msg));
}
let assets = match &config.assets {
Some(a) => a.clone(),
None => AssetPaths::discover(),
};
let kernel: PathBuf = if let Some(k) = image.bundled_kernel.as_ref() {
k.clone()
} else if let Some(k) = assets.kernel.as_ref() {
k.clone()
} else {
return Err(Error::assets_msg(
"no kernel found: snapshot dir has no bundled kernel and AssetPaths::discover() came up empty; set VmConfig::with_assets() or $SUPERMACHINE_ASSETS_DIR".to_owned(),
));
};
let kernel = kernel.as_path();
let dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => std::env::temp_dir(),
};
let mut own_dir = None;
if !dir.is_dir() {
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
own_dir = Some(dir.clone());
}
let vsock_mux_path = dir.join(format!(
"supermachine-vm-{}-{}.sock",
std::process::id(),
unique_suffix(),
));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let memory_mib = config.memory_mib.unwrap_or(image.memory_mib);
let vcpus = config.vcpus.unwrap_or(image.vcpus);
let mut resources = VmResources::new()
.with_kernel_path(kernel.to_string_lossy().to_string())
.with_memory_mib(memory_mib as usize)
.with_vcpus(vcpus)
.with_cow_restore(true)
.with_restore(image.snapshot_path.to_string_lossy().to_string())
.with_vsock_mux(vsock_mux_path.to_string_lossy().to_string())
.with_vsock_exec(vsock_exec_path.to_string_lossy().to_string());
for layer in &image.layers {
resources = resources.with_block_device(layer.to_string_lossy().to_string());
}
if let Some(delta) = &image.delta_squashfs {
resources = resources.with_block_device(delta.to_string_lossy().to_string());
}
let options = RunOptions::default();
let pool = WarmPool::start(resources, options).map_err(Error::from)?;
let timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let _ = pool
.restore_timeout(image.snapshot_path.to_string_lossy().to_string(), timeout)
.map_err(Error::from)?;
Ok(Vm {
pool: Some(pool),
vsock_mux_path,
vsock_exec_path,
own_vsock_mux_dir: own_dir,
skip_cleanup: false,
image_meta: Some(Arc::new(ImageMeta {
memory_mib,
vcpus,
layers: image.layers.clone(),
delta_squashfs: image.delta_squashfs.clone(),
})),
})
}
pub fn vsock_path(&self) -> &Path {
&self.vsock_mux_path
}
pub fn exec_path(&self) -> &Path {
&self.vsock_exec_path
}
pub fn exec<I, S>(&self, argv: I) -> std::io::Result<crate::exec::ExecChild>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let argv: Vec<String> = argv.into_iter().map(|s| s.into()).collect();
let _span = tracing::info_span!(
"supermachine.exec",
argv0 = argv.first().map(|s| s.as_str()).unwrap_or(""),
argc = argv.len(),
)
.entered();
self.exec_builder().argv(argv).spawn()
}
pub fn exec_builder(&self) -> crate::exec::ExecBuilder {
crate::exec::ExecBuilder::new(self.vsock_exec_path.clone())
}
pub fn write_file(&self, path: &str, bytes: &[u8]) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "write_file",
"path": path,
"data_b64": b64_encode(bytes),
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn read_file(&self, path: &str) -> std::io::Result<Vec<u8>> {
self.read_file_with_max_bytes(path, 32 * 1024 * 1024)
}
pub fn read_file_with_max_bytes(
&self,
path: &str,
max_bytes: u64,
) -> std::io::Result<Vec<u8>> {
let body = serde_json::json!({
"action": "read_file",
"path": path,
"max_bytes": max_bytes,
});
let ack = crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(std::time::Duration::from_secs(30)),
)?;
let data_b64 = ack
.get("data_b64")
.and_then(|v| v.as_str())
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"read_file: agent ack missing data_b64",
)
})?;
b64_decode(data_b64)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
pub fn workload_signal(&self, signum: i32) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "signal",
"signum": signum,
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn connect(&self) -> std::io::Result<UnixStream> {
UnixStream::connect(&self.vsock_mux_path)
}
pub fn expose_tcp(&self, host_port: u16, _guest_port: u16) -> std::io::Result<TcpForwarder> {
let listener = TcpListener::bind(("127.0.0.1", host_port))?;
let bound = listener.local_addr()?;
listener.set_nonblocking(false)?;
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = stop.clone();
let vsock_path = self.vsock_mux_path.clone();
let handle = std::thread::Builder::new()
.name(format!("supermachine-tcp-{host_port}"))
.spawn(move || {
accept_loop(listener, vsock_path, stop_thread);
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(TcpForwarder {
stop,
handle: Some(handle),
bound,
})
}
pub fn stop(mut self) -> Result<(), Error> {
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown().map_err(Error::from)?;
}
self.cleanup_socket();
Ok(())
}
pub fn snapshot(mut self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let meta = self.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"Vm::snapshot requires an in-process Vm (use image.start, not image.acquire)"
.to_owned(),
)
})?;
let pool = self.pool.as_ref().ok_or_else(|| {
Error::vm_msg("Vm::snapshot: no pool to drive the capture".to_owned())
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let _result = pool
.snapshot_timeout(
snap_path.to_string_lossy().to_string(),
Duration::from_secs(60),
)
.map_err(|e| Error::Vm {
msg: format!("snapshot capture failed: {e:?}"),
source: None,
})?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
self.skip_cleanup = true;
Image::from_snapshot(&dest_dir)
}
fn cleanup_socket(&self) {
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
if let Some(dir) = &self.own_vsock_mux_dir {
let _ = std::fs::remove_dir(dir);
}
}
}
const B64_ALPHA: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
pub fn b64_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
let mut i = 0;
while i + 3 <= bytes.len() {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let b2 = bytes[i + 2] as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push(B64_ALPHA[(n & 0x3f) as usize] as char);
i += 3;
}
let rem = bytes.len() - i;
if rem == 1 {
let b0 = bytes[i] as u32;
let n = b0 << 16;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push('=');
out.push('=');
} else if rem == 2 {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let n = (b0 << 16) | (b1 << 8);
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push('=');
}
out
}
pub fn b64_decode(s: &str) -> Result<Vec<u8>, String> {
let mut tbl = [255u8; 256];
for (i, &b) in B64_ALPHA.iter().enumerate() {
tbl[b as usize] = i as u8;
}
let bytes: Vec<u8> = s.bytes().filter(|b| !b.is_ascii_whitespace()).collect();
if bytes.len() % 4 != 0 {
return Err(format!("base64 length {} is not a multiple of 4", bytes.len()));
}
let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
for chunk in bytes.chunks_exact(4) {
let v: [u8; 4] = chunk.try_into().unwrap();
let pad = v.iter().filter(|&&b| b == b'=').count();
let mut acc: u32 = 0;
for &b in &v {
let d = if b == b'=' { 0 } else { tbl[b as usize] };
if b != b'=' && d == 255 {
return Err(format!("invalid base64 character {:#x}", b));
}
acc = (acc << 6) | (d as u32);
}
out.push(((acc >> 16) & 0xff) as u8);
if pad < 2 {
out.push(((acc >> 8) & 0xff) as u8);
}
if pad < 1 {
out.push((acc & 0xff) as u8);
}
}
Ok(out)
}
impl Drop for Vm {
fn drop(&mut self) {
if self.skip_cleanup {
return;
}
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
}
}
pub struct TcpForwarder {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
bound: SocketAddr,
}
impl TcpForwarder {
pub fn local_addr(&self) -> SocketAddr {
self.bound
}
pub fn stop(mut self) {
self.shutdown();
}
fn shutdown(&mut self) {
self.stop.store(true, Ordering::SeqCst);
let _ = TcpStream::connect_timeout(&self.bound, Duration::from_millis(200));
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for TcpForwarder {
fn drop(&mut self) {
self.shutdown();
}
}
fn accept_loop(listener: TcpListener, vsock_path: PathBuf, stop: Arc<AtomicBool>) {
for incoming in listener.incoming() {
if stop.load(Ordering::SeqCst) {
break;
}
let tcp = match incoming {
Ok(s) => s,
Err(_) => continue,
};
let vsock = vsock_path.clone();
std::thread::Builder::new()
.name("supermachine-tcp-conn".into())
.spawn(move || {
if let Err(e) = splice_tcp_to_unix(tcp, &vsock) {
eprintln!("supermachine: tcp forward: {e}");
}
})
.ok();
}
}
fn splice_tcp_to_unix(tcp: TcpStream, vsock_path: &Path) -> std::io::Result<()> {
let unix = UnixStream::connect(vsock_path)?;
let tcp_w = tcp.try_clone()?;
let unix_w = unix.try_clone()?;
let t1 = std::thread::Builder::new()
.name("supermachine-tcp-c2g".into())
.spawn(move || {
let _ = pump(tcp, unix_w);
})?;
let t2 = std::thread::Builder::new()
.name("supermachine-tcp-g2c".into())
.spawn(move || {
let _ = pump(unix, tcp_w);
})?;
let _ = t1.join();
let _ = t2.join();
Ok(())
}
fn pump<R, W>(mut r: R, mut w: W) -> std::io::Result<()>
where
R: Read,
W: Write + Shutdownable,
{
let mut buf = [0u8; 16 * 1024];
loop {
let n = match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
if let Err(e) = w.write_all(&buf[..n]) {
return Err(e);
}
}
let _ = w.shutdown_write();
Ok(())
}
trait Shutdownable {
fn shutdown_write(&mut self) -> std::io::Result<()>;
}
impl Shutdownable for TcpStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
TcpStream::shutdown(self, std::net::Shutdown::Write)
}
}
impl Shutdownable for UnixStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
UnixStream::shutdown(self, std::net::Shutdown::Write)
}
}
fn unique_suffix() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
nanos.wrapping_add(COUNTER.fetch_add(1, Ordering::Relaxed))
}
fn chrono_rfc3339_now() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let days = secs.div_euclid(86_400);
let sod = secs.rem_euclid(86_400);
let hh = sod / 3600;
let mm = (sod % 3600) / 60;
let ss = sod % 60;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
}
#[cfg(test)]
mod map_bake_error_tests {
use super::{map_bake_error, Error};
#[test]
fn decodes_kernel_panic_sentinel() {
let msg = format!(
"KERNEL_PANIC|Kernel panic - not syncing: oops|frame_a\x1Fframe_b\x1Fframe_c"
);
let err = map_bake_error("nginx:alpine", msg);
match err {
Error::KernelPanic { first_line, stack } => {
assert!(first_line.contains("Kernel panic"));
assert_eq!(stack, vec!["frame_a", "frame_b", "frame_c"]);
}
other => panic!("expected KernelPanic, got {other:?}"),
}
}
#[test]
fn kernel_panic_with_empty_stack() {
let msg = "KERNEL_PANIC|Internal error: Oops|".to_owned();
match map_bake_error("img", msg) {
Error::KernelPanic { first_line, stack } => {
assert_eq!(first_line, "Internal error: Oops");
assert!(stack.len() <= 1);
}
other => panic!("expected KernelPanic, got {other:?}"),
}
}
#[test]
fn non_panic_messages_unchanged() {
let err = map_bake_error("img", "supermachine snapshot timeout; see bake.log".to_owned());
match err {
Error::Bake { msg, .. } => assert!(msg.contains("timeout")),
other => panic!("expected Bake, got {other:?}"),
}
}
}