use std::collections::VecDeque;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::assets::AssetPaths;
use crate::vmm::pool::{PoolClientError, WarmPool, WarmPoolError};
use crate::vmm::resources::VmResources;
use crate::vmm::runner::RunOptions;
#[non_exhaustive]
pub enum Error {
Image {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Vm {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Assets {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Io(std::io::Error),
Network {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
CacheMiss {
msg: String,
},
CacheInvalid {
msg: String,
},
Bake {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
ImageNotFound {
image: String,
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
RegistryAuth {
image: String,
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
RegistryUnreachable {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
PoolExhausted {
msg: String,
},
}
impl Error {
pub(crate) fn image_msg(msg: impl Into<String>) -> Self {
Error::Image { msg: msg.into(), source: None }
}
pub(crate) fn vm_msg(msg: impl Into<String>) -> Self {
Error::Vm { msg: msg.into(), source: None }
}
pub(crate) fn assets_msg(msg: impl Into<String>) -> Self {
Error::Assets { msg: msg.into(), source: None }
}
pub(crate) fn network_msg(msg: impl Into<String>) -> Self {
Error::Network { msg: msg.into(), source: None }
}
pub(crate) fn bake_msg(msg: impl Into<String>) -> Self {
Error::Bake { msg: msg.into(), source: None }
}
pub(crate) fn cache_miss(msg: impl Into<String>) -> Self {
Error::CacheMiss { msg: msg.into() }
}
pub(crate) fn cache_invalid(msg: impl Into<String>) -> Self {
Error::CacheInvalid { msg: msg.into() }
}
pub(crate) fn image_not_found(image: impl Into<String>, msg: impl Into<String>) -> Self {
Error::ImageNotFound { image: image.into(), msg: msg.into(), source: None }
}
pub(crate) fn registry_auth(image: impl Into<String>, msg: impl Into<String>) -> Self {
Error::RegistryAuth { image: image.into(), msg: msg.into(), source: None }
}
pub(crate) fn registry_unreachable(msg: impl Into<String>) -> Self {
Error::RegistryUnreachable { msg: msg.into(), source: None }
}
pub(crate) fn pool_exhausted(msg: impl Into<String>) -> Self {
Error::PoolExhausted { msg: msg.into() }
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, source } => f
.debug_struct("Image")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Vm { msg, source } => f
.debug_struct("Vm")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Assets { msg, source } => f
.debug_struct("Assets")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Io(e) => f.debug_tuple("Io").field(e).finish(),
Error::Network { msg, source } => f
.debug_struct("Network")
.field("msg", msg)
.field("source", source)
.finish(),
Error::CacheMiss { msg } => f.debug_struct("CacheMiss").field("msg", msg).finish(),
Error::CacheInvalid { msg } => {
f.debug_struct("CacheInvalid").field("msg", msg).finish()
}
Error::Bake { msg, source } => f
.debug_struct("Bake")
.field("msg", msg)
.field("source", source)
.finish(),
Error::ImageNotFound { image, msg, source } => f
.debug_struct("ImageNotFound")
.field("image", image)
.field("msg", msg)
.field("source", source)
.finish(),
Error::RegistryAuth { image, msg, source } => f
.debug_struct("RegistryAuth")
.field("image", image)
.field("msg", msg)
.field("source", source)
.finish(),
Error::RegistryUnreachable { msg, source } => f
.debug_struct("RegistryUnreachable")
.field("msg", msg)
.field("source", source)
.finish(),
Error::PoolExhausted { msg } => {
f.debug_struct("PoolExhausted").field("msg", msg).finish()
}
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, .. } => write!(f, "image: {msg}"),
Error::Vm { msg, .. } => write!(f, "vm: {msg}"),
Error::Assets { msg, .. } => write!(f, "assets: {msg}"),
Error::Io(e) => write!(f, "io: {e}"),
Error::Network { msg, .. } => write!(f, "network: {msg}"),
Error::CacheMiss { msg } => write!(f, "cache miss: {msg}"),
Error::CacheInvalid { msg } => write!(f, "cache invalid: {msg}"),
Error::Bake { msg, .. } => write!(f, "bake: {msg}"),
Error::ImageNotFound { image, msg, .. } => {
write!(f, "image not found ({image}): {msg}")
}
Error::RegistryAuth { image, msg, .. } => {
write!(f, "registry auth failed for {image}: {msg}")
}
Error::RegistryUnreachable { msg, .. } => {
write!(f, "registry unreachable: {msg}")
}
Error::PoolExhausted { msg } => write!(f, "pool exhausted: {msg}"),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Image { source, .. }
| Error::Vm { source, .. }
| Error::Assets { source, .. }
| Error::Network { source, .. }
| Error::Bake { source, .. }
| Error::ImageNotFound { source, .. }
| Error::RegistryAuth { source, .. }
| Error::RegistryUnreachable { source, .. } => {
source.as_ref().map(|s| s.as_ref() as &(dyn std::error::Error + 'static))
}
Error::Io(e) => Some(e),
Error::CacheMiss { .. }
| Error::CacheInvalid { .. }
| Error::PoolExhausted { .. } => None,
}
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
}
impl From<WarmPoolError> for Error {
fn from(e: WarmPoolError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
impl From<PoolClientError> for Error {
fn from(e: PoolClientError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PullPolicy {
Always,
Missing,
Never,
}
impl Default for PullPolicy {
fn default() -> Self {
Self::Missing
}
}
impl PullPolicy {
fn as_bake_str(self) -> &'static str {
match self {
Self::Always => "always",
Self::Missing => "missing",
Self::Never => "never",
}
}
}
#[derive(Debug, Clone)]
pub struct Image {
snapshot_path: PathBuf,
pub(crate) memory_mib: u32,
pub(crate) vcpus: u32,
pub(crate) layers: Vec<PathBuf>,
pub(crate) delta_squashfs: Option<PathBuf>,
pub(crate) bundled_kernel: Option<PathBuf>,
pub(crate) hidden_pool: std::sync::OnceLock<Arc<HiddenPool>>,
}
#[doc(hidden)]
pub struct HiddenPool {
state: Arc<Mutex<PoolState>>,
available: Arc<Condvar>,
dirty: Option<Arc<Mutex<VecDeque<Worker>>>>,
dirty_pending: Option<Arc<Condvar>>,
socks_dir: PathBuf,
shutting_down: Arc<AtomicBool>,
spawn_cfg: Arc<SpawnConfig>,
policy: PoolPolicy,
}
#[derive(Debug, Clone, Copy)]
struct PoolPolicy {
min: usize,
max: usize,
idle_timeout: Duration,
acquire_timeout: Option<Duration>,
restore_on_release: bool,
}
impl Default for PoolPolicy {
fn default() -> Self {
Self {
min: 0,
max: 64,
idle_timeout: Duration::from_secs(60),
acquire_timeout: Some(Duration::from_secs(60)),
restore_on_release: true,
}
}
}
struct PoolState {
idle: Vec<IdleEntry>,
alive: usize,
waiting: usize,
}
struct IdleEntry {
worker: Worker,
last_used: Instant,
}
struct Worker {
child: Child,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
control_path: PathBuf,
control: Arc<Mutex<ControlChannel>>,
}
struct SnapshotStats {
bytes_written: u64,
capture_us: u64,
save_us: u64,
}
struct ControlChannel {
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
writer: std::os::unix::net::UnixStream,
}
impl ControlChannel {
fn send_line(&mut self, line: &str) -> std::io::Result<()> {
use std::io::Write;
self.writer.write_all(line.as_bytes())?;
if !line.ends_with('\n') {
self.writer.write_all(b"\n")?;
}
self.writer.flush()
}
fn read_line(&mut self) -> std::io::Result<String> {
use std::io::BufRead;
let mut buf = String::new();
let n = self.reader.read_line(&mut buf)?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"worker control socket closed",
));
}
Ok(buf)
}
}
impl Worker {
fn send_restore(&self, snapshot_path: &Path) -> Result<(), Error> {
let path_str = snapshot_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("RESTORE {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if line.starts_with("DONE") {
Ok(())
} else {
Err(Error::vm_msg(format!(
"worker RESTORE: expected DONE response, got: {}",
line.trim()
)))
}
}
fn send_snapshot(&self, out_path: &Path) -> Result<SnapshotStats, Error> {
let path_str = out_path
.to_str()
.ok_or_else(|| Error::vm_msg("snapshot path is not valid UTF-8".to_owned()))?;
let mut ctl = self
.control
.lock()
.map_err(|_| Error::vm_msg("worker control mutex poisoned".to_owned()))?;
ctl.send_line(&format!("SNAPSHOT {path_str}"))
.map_err(Error::Io)?;
let line = ctl.read_line().map_err(Error::Io)?;
if let Some(rest) = line.strip_prefix("DONE_SNAPSHOT") {
let mut stats = SnapshotStats {
bytes_written: 0,
capture_us: 0,
save_us: 0,
};
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("bytes_written=") {
stats.bytes_written = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("capture_us=") {
stats.capture_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("save_us=") {
stats.save_us = v.parse().unwrap_or(0);
}
}
Ok(stats)
} else if let Some(rest) = line.strip_prefix("ERR_SNAPSHOT ") {
Err(Error::vm_msg(format!(
"worker SNAPSHOT failed: {}",
rest.trim()
)))
} else {
Err(Error::vm_msg(format!(
"worker SNAPSHOT: unexpected response: {}",
line.trim()
)))
}
}
fn shutdown(&mut self) {
if let Ok(mut ctl) = self.control.lock() {
let _ = ctl.send_line("QUIT");
}
let deadline = Instant::now() + Duration::from_millis(100);
loop {
match self.child.try_wait() {
Ok(Some(_)) => break,
Ok(None) if Instant::now() < deadline => {
std::thread::sleep(Duration::from_millis(2));
}
_ => {
let _ = self.child.kill();
let _ = self.child.wait();
break;
}
}
}
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
let _ = std::fs::remove_file(&self.control_path);
let mut h = self.vsock_mux_path.clone();
h.set_extension("handoff");
let _ = std::fs::remove_file(&h);
}
}
struct SpawnConfig {
worker_bin: PathBuf,
snapshot_path: PathBuf,
layers: Vec<PathBuf>,
delta_squashfs: Option<PathBuf>,
memory_mib: u32,
vcpus: u32,
socks_dir: PathBuf,
name_prefix: String,
spawn_timeout: Duration,
}
impl SpawnConfig {
fn spawn_one(&self) -> Result<Worker, Error> {
use std::os::unix::net::UnixListener;
let suffix = unique_suffix();
let vsock_mux_path = self
.socks_dir
.join(format!("{}-{:016x}.sock", self.name_prefix, suffix));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let control_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-ctl");
p.set_file_name(name);
p
};
let _ = std::fs::remove_file(&vsock_mux_path);
let _ = std::fs::remove_file(&vsock_exec_path);
let _ = std::fs::remove_file(&control_path);
let ctl_listener = UnixListener::bind(&control_path).map_err(|e| {
Error::vm_msg(format!(
"bind control socket {}: {e}",
control_path.display()
))
})?;
let mut cmd = Command::new(&self.worker_bin);
for layer in &self.layers {
cmd.arg("--virtio-blk").arg(layer);
}
if let Some(delta) = &self.delta_squashfs {
cmd.arg("--virtio-blk").arg(delta);
}
cmd.arg("--memory").arg(self.memory_mib.to_string());
cmd.arg("--vcpus").arg(self.vcpus.to_string());
cmd.arg("--restore-from").arg(&self.snapshot_path);
cmd.arg("--cow-restore");
cmd.arg("--vsock-mux").arg(&vsock_mux_path);
cmd.arg("--vsock-exec").arg(&vsock_exec_path);
cmd.arg("--pool-worker").arg(&control_path);
let log_to_stdio = std::env::var("SUPERMACHINE_WORKER_LOG")
.map(|v| v == "1" || v == "true")
.unwrap_or(false);
if !log_to_stdio {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
let __t0 = Instant::now();
let child = cmd
.spawn()
.map_err(|e| Error::vm_msg(format!("spawn worker {}: {e}", self.worker_bin.display())))?;
let __t_spawned = __t0.elapsed();
ctl_listener
.set_nonblocking(true)
.map_err(|e| Error::vm_msg(format!("set control listener nonblocking: {e}")))?;
let deadline = Instant::now() + self.spawn_timeout;
let mut backoff = Duration::from_millis(1);
let stream = loop {
match ctl_listener.accept() {
Ok((s, _)) => break s,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() > deadline {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: control connect did not arrive within {:?}",
self.spawn_timeout
)));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(Duration::from_millis(10));
}
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: control accept: {e}"
)));
}
}
};
stream
.set_nonblocking(false)
.map_err(|e| Error::vm_msg(format!("set control stream blocking: {e}")))?;
let writer = stream
.try_clone()
.map_err(|e| Error::vm_msg(format!("clone control stream: {e}")))?;
let mut control = ControlChannel {
reader: std::io::BufReader::new(stream),
writer,
};
let ready = match control.read_line() {
Ok(l) => l,
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: read READY: {e}"
)));
}
};
if ready.trim() != "READY" {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: expected READY, got: {}",
ready.trim()
)));
}
let snap_path_str = self.snapshot_path.to_string_lossy().to_string();
if let Err(e) = control.send_line(&format!("RESTORE {snap_path_str}")) {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: send initial RESTORE: {e}"
)));
}
let done = match control.read_line() {
Ok(l) => l,
Err(e) => {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: read initial DONE: {e}"
)));
}
};
if !done.starts_with("DONE") {
let _ = std::fs::remove_file(&control_path);
return Err(Error::vm_msg(format!(
"worker spawn: expected initial DONE, got: {}",
done.trim()
)));
}
if std::env::var_os("SUPERMACHINE_TIMINGS").is_some() {
eprintln!(
"[spawn_one] spawn={:?} accept_to_done={:?} total={:?}",
__t_spawned,
__t0.elapsed() - __t_spawned,
__t0.elapsed()
);
}
if let Err(e) = probe_agent_protocol(&vsock_exec_path) {
let _ = std::fs::remove_file(&control_path);
return Err(e);
}
Ok(Worker {
child,
vsock_mux_path,
vsock_exec_path,
control_path,
control: Arc::new(Mutex::new(control)),
})
}
}
const HOST_AGENT_PROTOCOL_MIN: u32 = 2;
fn probe_agent_protocol(vsock_exec_path: &Path) -> Result<(), Error> {
let body = serde_json::json!({ "action": "probe" });
let ack = match crate::exec::send_control_with_ack(
vsock_exec_path,
&body,
Some(Duration::from_secs(10)),
) {
Ok(a) => a,
Err(e) => {
return Err(Error::vm_msg(format!(
"agent in this snapshot is from an older supermachine release \
(probe failed: {e}). Rebake the snapshot to pick up the new \
agent: rm -rf the snapshot dir and re-run your bake (e.g. \
`supermachine pull <image> --name <name>`)."
)));
}
};
let proto = ack.get("protocol").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
if proto < HOST_AGENT_PROTOCOL_MIN {
return Err(Error::vm_msg(format!(
"agent in this snapshot speaks protocol v{proto} but this \
supermachine library expects v{HOST_AGENT_PROTOCOL_MIN}+. The \
snapshot was baked against a previous release; rebake to pick up \
the new agent (rm -rf the snapshot dir and re-run your bake)."
)));
}
Ok(())
}
impl std::fmt::Debug for HiddenPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = self.state.lock().ok();
f.debug_struct("HiddenPool")
.field("socks_dir", &self.socks_dir)
.field(
"alive",
&s.as_ref().map(|s| s.alive).unwrap_or(usize::MAX),
)
.field(
"idle",
&s.as_ref().map(|s| s.idle.len()).unwrap_or(usize::MAX),
)
.finish()
}
}
impl Drop for HiddenPool {
fn drop(&mut self) {
self.shutting_down.store(true, Ordering::SeqCst);
self.available.notify_all();
if let Some(c) = self.dirty_pending.as_ref() {
c.notify_all();
}
if let Ok(mut s) = self.state.lock() {
while let Some(mut e) = s.idle.pop() {
e.worker.shutdown();
s.alive = s.alive.saturating_sub(1);
}
if let Some(d) = self.dirty.as_ref() {
if let Ok(mut q) = d.lock() {
while let Some(mut w) = q.pop_front() {
w.shutdown();
s.alive = s.alive.saturating_sub(1);
}
}
}
}
let _ = std::fs::remove_dir_all(&self.socks_dir);
}
}
impl HiddenPool {
fn acquire(&self) -> Result<Worker, Error> {
let acquire_t0 = Instant::now();
let acquire_timeout = self.policy.acquire_timeout;
let mut state = self
.state
.lock()
.map_err(|_| Error::vm_msg("pool mutex poisoned".to_owned()))?;
loop {
if let Some(entry) = state.idle.pop() {
return Ok(entry.worker);
}
if self.shutting_down.load(Ordering::SeqCst) {
return Err(Error::vm_msg("pool is shutting down".to_owned()));
}
if state.alive < self.policy.max {
state.alive += 1;
drop(state);
let spawned = self.spawn_cfg.spawn_one();
match spawned {
Ok(w) => return Ok(w),
Err(e) => {
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
return Err(e);
}
}
}
if let Some(total) = acquire_timeout {
if acquire_t0.elapsed() >= total {
return Err(Error::pool_exhausted(format!(
"acquire timed out after {total:?}; pool at max ({})",
self.policy.max
)));
}
}
state.waiting += 1;
let (new_state, timed_out) = match acquire_timeout {
None => match self.available.wait(state) {
Ok(s) => (s, false),
Err(_) => {
return Err(Error::vm_msg("pool condvar poisoned".to_owned()))
}
},
Some(total) => {
let remaining = total.saturating_sub(acquire_t0.elapsed());
match self.available.wait_timeout(state, remaining) {
Ok((s, r)) => (s, r.timed_out()),
Err(_) => {
return Err(Error::vm_msg(
"pool condvar poisoned".to_owned(),
))
}
}
}
};
state = new_state;
state.waiting = state.waiting.saturating_sub(1);
if timed_out {
return Err(Error::pool_exhausted(format!(
"acquire timed out after {:?}; pool at max ({})",
acquire_timeout.unwrap_or_default(),
self.policy.max
)));
}
}
}
fn release(&self, worker: Worker) {
if !self.policy.restore_on_release {
if let Ok(mut s) = self.state.lock() {
s.idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
self.available.notify_all();
}
return;
}
if let Some(d) = self.dirty.as_ref() {
if let Ok(mut q) = d.lock() {
q.push_back(worker);
}
if let Some(c) = self.dirty_pending.as_ref() {
c.notify_all();
}
} else {
let mut w = worker;
w.shutdown();
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
}
}
}
impl Image {
pub fn from_snapshot(path: impl Into<PathBuf>) -> Result<Self, Error> {
let path = path.into();
let (snapshot_path, metadata_path) = if path.is_dir() {
(path.join("restore.snap"), path.join("metadata.json"))
} else if path.is_file() {
let parent = path.parent().ok_or_else(|| {
Error::image_msg(format!("snapshot path has no parent dir: {}", path.display()))
})?;
(path.clone(), parent.join("metadata.json"))
} else {
return Err(Error::image_msg(format!(
"snapshot path not found: {}",
path.display()
)));
};
if !snapshot_path.is_file() {
return Err(Error::image_msg(format!(
"snapshot file not found: {}",
snapshot_path.display()
)));
}
if !metadata_path.is_file() {
return Err(Error::image_msg(format!(
"metadata.json not found alongside snapshot at {}",
metadata_path.display()
)));
}
let meta_text = std::fs::read_to_string(&metadata_path)
.map_err(|e| Error::image_msg(format!("read {}: {e}", metadata_path.display())))?;
let meta: serde_json::Value = serde_json::from_str(&meta_text)
.map_err(|e| Error::image_msg(format!("parse {}: {e}", metadata_path.display())))?;
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(256);
let vcpus = meta
.get("vcpus")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(1);
let metadata_dir = metadata_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let resolve_path = |s: &str| -> PathBuf {
let p = PathBuf::from(s);
if p.is_absolute() {
p
} else {
metadata_dir.join(p)
}
};
let layers: Vec<PathBuf> = meta
.get("layers")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(&resolve_path))
.collect()
})
.unwrap_or_default();
let delta_squashfs = meta
.get("delta_squashfs")
.and_then(|v| v.as_str())
.map(&resolve_path);
let bundled_kernel = {
let cand = metadata_dir.join("kernel");
if cand.is_file() {
Some(cand)
} else {
None
}
};
Ok(Self {
snapshot_path,
memory_mib,
vcpus,
layers,
delta_squashfs,
bundled_kernel,
hidden_pool: std::sync::OnceLock::new(),
})
}
pub fn from_oci(image_ref: &str) -> Result<Self, Error> {
Self::from_oci_with_policy(image_ref, PullPolicy::default())
}
pub fn from_oci_with_policy(
image_ref: &str,
policy: PullPolicy,
) -> Result<Self, Error> {
let snapshots_dir = default_snapshots_dir();
Self::from_oci_to_dir(image_ref, policy, &snapshots_dir, None)
}
pub fn from_oci_to_dir(
image_ref: &str,
policy: PullPolicy,
snapshots_dir: &Path,
name: Option<&str>,
) -> Result<Self, Error> {
let derived = name
.map(|s| s.to_owned())
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(image_ref));
let snap_dir = snapshots_dir.join(&derived);
let cache_loadable = Self::from_snapshot(&snap_dir).is_ok();
match policy {
PullPolicy::Never => {
if cache_loadable {
return Self::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {image_ref} at {} (PullPolicy::Never)",
snap_dir.display()
)));
}
PullPolicy::Missing if cache_loadable => {
return Self::from_snapshot(&snap_dir);
}
_ => {}
}
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: image_ref.to_owned(),
name: name.map(|s| s.to_owned()),
runtime: "supermachine".to_owned(),
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.to_path_buf(),
cmd_override: None,
extra_args: Vec::new(),
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root)
.map_err(|e| map_bake_error(&request.image, e))?;
Self::from_snapshot(&snap_dir)
}
pub fn builder(image_ref: impl Into<String>) -> OciImageBuilder {
OciImageBuilder::new(image_ref)
}
pub fn ensure_baked<F>(
name: impl Into<String>,
image_ref: impl Into<String>,
configure: F,
) -> Result<Image, Error>
where
F: FnOnce(OciImageBuilder) -> OciImageBuilder,
{
let builder = configure(
OciImageBuilder::new(image_ref).with_name(name),
);
builder.build()
}
pub fn snapshot_path(&self) -> &Path {
&self.snapshot_path
}
pub fn memory_mib(&self) -> u32 {
self.memory_mib
}
pub fn vcpus(&self) -> u32 {
self.vcpus
}
pub fn start(&self, config: &VmConfig) -> Result<Vm, Error> {
Vm::start(self, config)
}
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
self.acquire_with(&VmConfig::new())
}
pub fn acquire_with(&self, config: &VmConfig) -> Result<PooledVm<'_>, Error> {
let pool_arc = self.ensure_default_pool(config)?;
let worker = pool_arc.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: Some(Arc::new(ImageMeta {
memory_mib: config.memory_mib.unwrap_or(self.memory_mib),
vcpus: config.vcpus.unwrap_or(self.vcpus),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
})),
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(pool_arc),
_image: std::marker::PhantomData,
})
}
pub fn pool(&self) -> PoolBuilder<'_> {
PoolBuilder {
image: self,
policy: PoolPolicy::default(),
}
}
fn build_pool_arc(
&self,
config: &VmConfig,
policy: PoolPolicy,
) -> Result<Arc<HiddenPool>, Error> {
#[cfg(target_os = "macos")]
let worker_bin = crate::codesign::locate_worker_bin().ok_or_else(|| {
Error::assets_msg(
"supermachine-worker binary not found (looked for sibling of \
current_exe and target/release/supermachine-worker). Set \
SUPERMACHINE_WORKER_BIN if you have it elsewhere."
.to_owned(),
)
})?;
#[cfg(not(target_os = "macos"))]
let worker_bin: PathBuf = std::env::var_os("SUPERMACHINE_WORKER_BIN")
.map(PathBuf::from)
.ok_or_else(|| {
Error::assets_msg(
"SUPERMACHINE_WORKER_BIN must be set on this platform".to_owned(),
)
})?;
#[cfg(target_os = "macos")]
{
let _ = crate::codesign::ensure_worker_signed(&worker_bin);
}
let socks_dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => PathBuf::from(format!(
"/tmp/sm-pool-{}-{:x}",
std::process::id(),
unique_suffix(),
)),
};
std::fs::create_dir_all(&socks_dir).map_err(Error::Io)?;
let memory_mib = config.memory_mib.unwrap_or(self.memory_mib);
let vcpus = config.vcpus.unwrap_or(self.vcpus);
let spawn_timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(30));
let name_prefix = "w".to_owned();
let spawn_cfg = Arc::new(SpawnConfig {
worker_bin,
snapshot_path: self.snapshot_path.clone(),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
memory_mib,
vcpus,
socks_dir: socks_dir.clone(),
name_prefix,
spawn_timeout,
});
let initial = policy.min;
let mut idle: Vec<IdleEntry> = Vec::with_capacity(initial);
if initial == 1 {
idle.push(IdleEntry {
worker: spawn_cfg.spawn_one()?,
last_used: Instant::now(),
});
} else if initial > 1 {
let mut handles = Vec::with_capacity(initial);
for _ in 0..initial {
let cfg = Arc::clone(&spawn_cfg);
handles.push(std::thread::spawn(move || cfg.spawn_one()));
}
for h in handles {
let w = h
.join()
.map_err(|_| Error::vm_msg("pool spawn thread panicked".to_owned()))??;
idle.push(IdleEntry {
worker: w,
last_used: Instant::now(),
});
}
}
let pool = Arc::new(HiddenPool {
state: Arc::new(Mutex::new(PoolState {
idle,
alive: initial,
waiting: 0,
})),
available: Arc::new(Condvar::new()),
dirty: Some(Arc::new(Mutex::new(VecDeque::new()))),
dirty_pending: Some(Arc::new(Condvar::new())),
socks_dir,
shutting_down: Arc::new(AtomicBool::new(false)),
spawn_cfg: Arc::clone(&spawn_cfg),
policy,
});
let wait_handles = pool.wait_handles();
let h_replenish = {
let h = wait_handles.clone();
std::thread::Builder::new()
.name("supermachine-pool-replenish".into())
.spawn(move || replenisher_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn replenisher thread: {e}")))?
};
let mut handles = vec![h_replenish];
if policy.restore_on_release {
let restorer_count = ((policy.max + 1) / 2)
.clamp(1, 4)
.min(policy.max.max(1));
for _ in 0..restorer_count {
let h = wait_handles.clone();
let h_restore = std::thread::Builder::new()
.name("supermachine-pool-restore".into())
.spawn(move || restorer_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn restorer thread: {e}")))?;
handles.push(h_restore);
}
}
if pool.policy.idle_timeout != Duration::MAX {
let h = wait_handles.clone();
let h_janitor = std::thread::Builder::new()
.name("supermachine-pool-janitor".into())
.spawn(move || janitor_loop(h))
.map_err(|e| Error::vm_msg(format!("spawn janitor thread: {e}")))?;
handles.push(h_janitor);
}
drop(handles);
Ok(pool)
}
fn ensure_default_pool(
&self,
config: &VmConfig,
) -> Result<&Arc<HiddenPool>, Error> {
if let Some(p) = self.hidden_pool.get() {
return Ok(p);
}
let pool = self.build_pool_arc(config, PoolPolicy::default())?;
let _ = self.hidden_pool.set(pool);
Ok(self
.hidden_pool
.get()
.expect("hidden pool just initialized"))
}
}
#[derive(Clone)]
struct PoolWaitHandles {
state: Arc<Mutex<PoolState>>,
available: Arc<Condvar>,
dirty: Option<Arc<Mutex<VecDeque<Worker>>>>,
dirty_pending: Option<Arc<Condvar>>,
shutting_down: Arc<AtomicBool>,
spawn_cfg: Arc<SpawnConfig>,
policy: PoolPolicy,
}
impl HiddenPool {
fn wait_handles(&self) -> PoolWaitHandles {
PoolWaitHandles {
state: Arc::clone(&self.state),
available: Arc::clone(&self.available),
dirty: self.dirty.as_ref().map(Arc::clone),
dirty_pending: self.dirty_pending.as_ref().map(Arc::clone),
shutting_down: Arc::clone(&self.shutting_down),
spawn_cfg: Arc::clone(&self.spawn_cfg),
policy: self.policy,
}
}
}
fn restorer_loop(h: PoolWaitHandles) {
let (Some(dirty), Some(pending)) = (h.dirty.as_ref(), h.dirty_pending.as_ref()) else {
return;
};
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let mut worker = {
let mut q = match dirty.lock() {
Ok(q) => q,
Err(_) => return,
};
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
if let Some(w) = q.pop_front() {
break w;
}
q = match pending.wait_timeout(q, Duration::from_millis(100)) {
Ok((g, _)) => g,
Err(_) => return,
};
}
};
let snap_path = h.spawn_cfg.snapshot_path.clone();
match worker.send_restore(&snap_path) {
Ok(()) => {
if let Ok(mut s) = h.state.lock() {
s.idle.push(IdleEntry {
worker,
last_used: Instant::now(),
});
h.available.notify_all();
}
}
Err(_) => {
worker.shutdown();
if let Ok(mut s) = h.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
h.available.notify_all();
}
}
}
}
fn replenisher_loop(h: PoolWaitHandles) {
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let need_more = {
let s = match h.state.lock() {
Ok(s) => s,
Err(_) => return,
};
s.alive < h.policy.min
};
if !need_more {
let s = match h.state.lock() {
Ok(s) => s,
Err(_) => return,
};
let _ = h
.available
.wait_timeout(s, Duration::from_millis(100));
continue;
}
if let Ok(mut s) = h.state.lock() {
if s.alive >= h.policy.min {
continue;
}
s.alive += 1;
}
match h.spawn_cfg.spawn_one() {
Ok(w) => {
if let Ok(mut s) = h.state.lock() {
s.idle.push(IdleEntry {
worker: w,
last_used: Instant::now(),
});
h.available.notify_all();
}
}
Err(_) => {
if let Ok(mut s) = h.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
std::thread::sleep(Duration::from_millis(500));
}
}
}
}
fn janitor_loop(h: PoolWaitHandles) {
let timeout = h.policy.idle_timeout;
let min = h.policy.min;
if timeout == Duration::MAX {
return;
}
let tick = (timeout / 4).max(Duration::from_millis(100));
let wait_unit = Duration::from_millis(100).min(tick);
loop {
if h.shutting_down.load(Ordering::SeqCst) {
return;
}
let mut to_evict: Vec<Worker> = Vec::new();
if let Ok(mut s) = h.state.lock() {
let now = Instant::now();
while s.alive > min && !s.idle.is_empty() {
let oldest = &s.idle[0];
if now.duration_since(oldest.last_used) < timeout {
break;
}
let entry = s.idle.remove(0);
s.alive -= 1;
to_evict.push(entry.worker);
}
}
for mut w in to_evict {
w.shutdown();
}
let mut remaining = tick;
while remaining > Duration::ZERO && !h.shutting_down.load(Ordering::SeqCst) {
let chunk = remaining.min(wait_unit);
if let Ok(s) = h.state.lock() {
let _ = h.available.wait_timeout(s, chunk);
}
remaining = remaining.saturating_sub(chunk);
}
}
}
pub struct PooledVm<'a> {
vm: Option<Vm>,
worker: Option<Worker>,
pool_arc: Arc<HiddenPool>,
_image: std::marker::PhantomData<&'a Image>,
}
impl<'a> std::ops::Deref for PooledVm<'a> {
type Target = Vm;
fn deref(&self) -> &Vm {
self.vm.as_ref().expect("PooledVm used after drop")
}
}
impl<'a> std::ops::DerefMut for PooledVm<'a> {
fn deref_mut(&mut self) -> &mut Vm {
self.vm.as_mut().expect("PooledVm used after drop")
}
}
impl<'a> PooledVm<'a> {
pub fn snapshot(&self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let worker = self
.worker
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no worker (already dropped?)".to_owned()))?;
let vm = self
.vm
.as_ref()
.ok_or_else(|| Error::vm_msg("PooledVm: no vm (already dropped?)".to_owned()))?;
let meta = vm.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"PooledVm::snapshot: image metadata missing (acquire from a 0.3.8+ Image)"
.to_owned(),
)
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let _stats = worker.send_snapshot(&snap_path)?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "PooledVm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
Image::from_snapshot(&dest_dir)
}
}
impl<'a> Drop for PooledVm<'a> {
fn drop(&mut self) {
let _ = self.vm.take();
if let Some(worker) = self.worker.take() {
self.pool_arc.release(worker);
}
}
}
pub struct PoolBuilder<'a> {
image: &'a Image,
policy: PoolPolicy,
}
impl<'a> PoolBuilder<'a> {
pub fn min(mut self, n: usize) -> Self {
self.policy.min = n;
self
}
pub fn max(mut self, n: usize) -> Self {
self.policy.max = n.max(1);
self
}
pub fn idle_timeout(mut self, d: Duration) -> Self {
self.policy.idle_timeout = d;
self
}
pub fn acquire_timeout(mut self, d: Duration) -> Self {
self.policy.acquire_timeout = Some(d);
self
}
pub fn no_acquire_timeout(mut self) -> Self {
self.policy.acquire_timeout = None;
self
}
pub fn restore_on_release(mut self, on: bool) -> Self {
self.policy.restore_on_release = on;
self
}
pub fn build(self) -> Result<Pool, Error> {
let image = self.image;
let mut policy = self.policy;
if policy.min > policy.max {
policy.min = policy.max;
}
let arc = image.build_pool_arc(&VmConfig::new(), policy)?;
match image.hidden_pool.set(Arc::clone(&arc)) {
Ok(()) => Ok(Pool { inner: arc }),
Err(_) => Err(Error::vm_msg(
"image.pool().build() must be called before image.acquire() / \
image.acquire_with(). The Image already has a default pool \
from an earlier acquire — its policy can't be changed in \
place. Either call pool().build() first, or load the Image \
fresh: `Image::from_snapshot(path)?.pool()...build()?`."
.to_owned(),
)),
}
}
}
#[derive(Clone)]
pub struct Pool {
inner: Arc<HiddenPool>,
}
#[derive(Debug, Clone, Copy)]
pub struct PoolStats {
pub alive: usize,
pub in_use: usize,
pub idle: usize,
pub waiting: usize,
pub max: usize,
pub min: usize,
}
impl Pool {
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
let worker = self.inner.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: Some(Arc::new(ImageMeta {
memory_mib: self.inner.spawn_cfg.memory_mib,
vcpus: self.inner.spawn_cfg.vcpus,
layers: self.inner.spawn_cfg.layers.clone(),
delta_squashfs: self.inner.spawn_cfg.delta_squashfs.clone(),
})),
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(&self.inner),
_image: std::marker::PhantomData,
})
}
pub fn stats(&self) -> PoolStats {
let s = self.inner.state.lock().ok();
let alive = s.as_ref().map(|s| s.alive).unwrap_or(0);
let idle = s.as_ref().map(|s| s.idle.len()).unwrap_or(0);
let waiting = s.as_ref().map(|s| s.waiting).unwrap_or(0);
PoolStats {
alive,
in_use: alive.saturating_sub(idle),
idle,
waiting,
max: self.inner.policy.max,
min: self.inner.policy.min,
}
}
}
pub struct OciImageBuilder {
image: String,
name: Option<String>,
pull_policy: PullPolicy,
memory_mib: Option<u32>,
vcpus: Option<u32>,
guest_port: Option<u16>,
cmd: Option<Vec<String>>,
envs: Vec<(String, String)>,
snapshots_dir: Option<PathBuf>,
warmup: Option<Box<dyn FnOnce(&Vm) -> Result<(), Error> + Send>>,
warmup_tag: Option<String>,
}
impl OciImageBuilder {
pub fn new(image_ref: impl Into<String>) -> Self {
Self {
image: image_ref.into(),
name: None,
pull_policy: PullPolicy::default(),
memory_mib: None,
vcpus: None,
guest_port: None,
cmd: None,
envs: Vec::new(),
snapshots_dir: None,
warmup: None,
warmup_tag: None,
}
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_pull_policy(mut self, policy: PullPolicy) -> Self {
self.pull_policy = policy;
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_guest_port(mut self, port: u16) -> Self {
self.guest_port = Some(port);
self
}
pub fn with_cmd<I, S>(mut self, cmd: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.cmd = Some(cmd.into_iter().map(Into::into).collect());
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.envs.push((key.into(), value.into()));
self
}
pub fn with_snapshots_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.snapshots_dir = Some(dir.into());
self
}
pub fn with_warmup<F>(mut self, warmup: F) -> Self
where
F: FnOnce(&Vm) -> Result<(), Error> + Send + 'static,
{
self.warmup = Some(Box::new(warmup));
self
}
pub fn with_warmup_tag(mut self, tag: impl Into<String>) -> Self {
self.warmup_tag = Some(tag.into());
self
}
pub fn build(self) -> Result<Image, Error> {
let snapshots_dir = self
.snapshots_dir
.unwrap_or_else(default_snapshots_dir);
let derived_name = self
.name
.clone()
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(&self.image));
let snap_dir = snapshots_dir.join(&derived_name);
let cache_loadable = Image::from_snapshot(&snap_dir).is_ok();
match self.pull_policy {
PullPolicy::Never => {
if cache_loadable {
return Image::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {} at {} (PullPolicy::Never)",
self.image,
snap_dir.display()
)));
}
_ => {}
}
let mut extra_args: Vec<String> = Vec::new();
for (k, v) in &self.envs {
extra_args.push("--env".to_owned());
extra_args.push(format!("{k}={v}"));
}
let cmd_override = match &self.cmd {
Some(argv) => Some(
serde_json::to_string(argv)
.map_err(|e| Error::bake_msg(format!("encode cmd: {e}")))?,
),
None => None,
};
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: self.image.clone(),
name: self.name.clone(),
runtime: "supermachine".to_owned(),
guest_port: self.guest_port.unwrap_or(80),
memory_mib: self.memory_mib.unwrap_or(256),
vcpus: self.vcpus.unwrap_or(1),
pull_policy: self.pull_policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.clone(),
cmd_override,
extra_args,
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root)
.map_err(|e| map_bake_error(&request.image, e))?;
let Some(warmup) = self.warmup else {
return Image::from_snapshot(&snap_dir);
};
let tag = self.warmup_tag.as_deref().unwrap_or("default");
let warm_dir = snapshots_dir.join(format!("{}__warm__{}", derived_name, tag));
if let Ok(image) = Image::from_snapshot(&warm_dir) {
return Ok(image);
}
let base = Image::from_snapshot(&snap_dir)?;
let pooled = base.acquire_with(&VmConfig::new())?;
warmup(&pooled)?;
let warm_image = pooled.snapshot(&warm_dir)?;
drop(pooled);
Ok(warm_image)
}
}
fn default_snapshots_dir() -> PathBuf {
if let Some(d) = std::env::var_os("SUPERMACHINE_SNAPSHOTS") {
return PathBuf::from(d);
}
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
home.join(".local/supermachine-snapshots")
}
fn repo_root_for_bake() -> Result<PathBuf, Error> {
if let Some(root) = std::env::var_os("SUPERMACHINE_ROOT") {
return Ok(PathBuf::from(root));
}
let exe = std::env::current_exe()
.map_err(|e| Error::bake_msg(format!("current_exe: {e}")))?;
for ancestor in exe.ancestors() {
if ancestor.join("tools/supermachine-push").is_file() {
return Ok(ancestor.to_path_buf());
}
if ancestor.join("share/supermachine/kernel").is_file() {
return Ok(ancestor.to_path_buf());
}
}
std::env::current_dir().map_err(|e| Error::bake_msg(format!("current_dir: {e}")))
}
fn map_bake_error(image: &str, msg: String) -> Error {
let lc = msg.to_ascii_lowercase();
let has_status = |code: u16| -> bool {
let needle = format!("http {code}");
lc.contains(&needle)
};
if has_status(404)
|| lc.contains("manifest unknown")
|| lc.contains("name unknown")
|| lc.contains("not found")
&& (lc.contains("registry") || lc.contains("manifest") || lc.contains("image"))
{
return Error::image_not_found(image, msg);
}
if has_status(401) || has_status(403)
|| lc.contains("unauthorized")
|| lc.contains("forbidden")
|| lc.contains("auth challenge")
{
return Error::registry_auth(image, msg);
}
if lc.contains("could not resolve")
|| lc.contains("dns")
|| lc.contains("connection refused")
|| lc.contains("connection reset")
|| lc.contains("network is unreachable")
|| lc.contains("ssl_connect")
|| lc.contains("tls handshake")
|| lc.contains("curl: (6)") || lc.contains("curl: (7)") || lc.contains("curl: (28)") || lc.contains("curl: (35)") || lc.contains("curl: (56)") {
return Error::registry_unreachable(msg);
}
if lc.contains("registry") || lc.contains("manifest") || lc.contains("docker pull") {
Error::network_msg(msg)
} else {
Error::bake_msg(msg)
}
}
#[derive(Debug, Clone, Default)]
pub struct VmConfig {
memory_mib: Option<u32>,
vcpus: Option<u32>,
assets: Option<AssetPaths>,
vsock_mux_dir: Option<PathBuf>,
restore_timeout: Option<Duration>,
}
impl VmConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_assets(mut self, assets: AssetPaths) -> Self {
self.assets = Some(assets);
self
}
pub fn with_vsock_mux_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.vsock_mux_dir = Some(dir.into());
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.restore_timeout = Some(timeout);
self
}
}
pub struct Vm {
pool: Option<WarmPool>,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
own_vsock_mux_dir: Option<PathBuf>,
skip_cleanup: bool,
image_meta: Option<Arc<ImageMeta>>,
}
#[derive(Clone, Debug)]
pub(crate) struct ImageMeta {
pub memory_mib: u32,
pub vcpus: u32,
pub layers: Vec<PathBuf>,
pub delta_squashfs: Option<PathBuf>,
}
impl Vm {
pub fn start(image: &Image, config: &VmConfig) -> Result<Vm, Error> {
#[cfg(target_os = "macos")]
if let Err(msg) = crate::codesign::check_self_has_hvf_entitlement() {
return Err(Error::vm_msg(msg));
}
let assets = match &config.assets {
Some(a) => a.clone(),
None => AssetPaths::discover(),
};
let kernel: PathBuf = if let Some(k) = image.bundled_kernel.as_ref() {
k.clone()
} else if let Some(k) = assets.kernel.as_ref() {
k.clone()
} else {
return Err(Error::assets_msg(
"no kernel found: snapshot dir has no bundled kernel and AssetPaths::discover() came up empty; set VmConfig::with_assets() or $SUPERMACHINE_ASSETS_DIR".to_owned(),
));
};
let kernel = kernel.as_path();
let dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => std::env::temp_dir(),
};
let mut own_dir = None;
if !dir.is_dir() {
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
own_dir = Some(dir.clone());
}
let vsock_mux_path = dir.join(format!(
"supermachine-vm-{}-{}.sock",
std::process::id(),
unique_suffix(),
));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let memory_mib = config.memory_mib.unwrap_or(image.memory_mib);
let vcpus = config.vcpus.unwrap_or(image.vcpus);
let mut resources = VmResources::new()
.with_kernel_path(kernel.to_string_lossy().to_string())
.with_memory_mib(memory_mib as usize)
.with_vcpus(vcpus)
.with_cow_restore(true)
.with_restore(image.snapshot_path.to_string_lossy().to_string())
.with_vsock_mux(vsock_mux_path.to_string_lossy().to_string())
.with_vsock_exec(vsock_exec_path.to_string_lossy().to_string());
for layer in &image.layers {
resources = resources.with_block_device(layer.to_string_lossy().to_string());
}
if let Some(delta) = &image.delta_squashfs {
resources = resources.with_block_device(delta.to_string_lossy().to_string());
}
let options = RunOptions::default();
let pool = WarmPool::start(resources, options).map_err(Error::from)?;
let timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let _ = pool
.restore_timeout(image.snapshot_path.to_string_lossy().to_string(), timeout)
.map_err(Error::from)?;
Ok(Vm {
pool: Some(pool),
vsock_mux_path,
vsock_exec_path,
own_vsock_mux_dir: own_dir,
skip_cleanup: false,
image_meta: Some(Arc::new(ImageMeta {
memory_mib,
vcpus,
layers: image.layers.clone(),
delta_squashfs: image.delta_squashfs.clone(),
})),
})
}
pub fn vsock_path(&self) -> &Path {
&self.vsock_mux_path
}
pub fn exec_path(&self) -> &Path {
&self.vsock_exec_path
}
pub fn exec<I, S>(&self, argv: I) -> std::io::Result<crate::exec::ExecChild>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.exec_builder().argv(argv).spawn()
}
pub fn exec_builder(&self) -> crate::exec::ExecBuilder {
crate::exec::ExecBuilder::new(self.vsock_exec_path.clone())
}
pub fn write_file(&self, path: &str, bytes: &[u8]) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "write_file",
"path": path,
"data_b64": b64_encode(bytes),
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn read_file(&self, path: &str) -> std::io::Result<Vec<u8>> {
let body = serde_json::json!({
"action": "read_file",
"path": path,
});
let ack = crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(std::time::Duration::from_secs(30)),
)?;
let data_b64 = ack
.get("data_b64")
.and_then(|v| v.as_str())
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"read_file: agent ack missing data_b64",
)
})?;
b64_decode(data_b64)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
pub fn workload_signal(&self, signum: i32) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "signal",
"signum": signum,
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn connect(&self) -> std::io::Result<UnixStream> {
UnixStream::connect(&self.vsock_mux_path)
}
pub fn expose_tcp(&self, host_port: u16, _guest_port: u16) -> std::io::Result<TcpForwarder> {
let listener = TcpListener::bind(("127.0.0.1", host_port))?;
let bound = listener.local_addr()?;
listener.set_nonblocking(false)?;
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = stop.clone();
let vsock_path = self.vsock_mux_path.clone();
let handle = std::thread::Builder::new()
.name(format!("supermachine-tcp-{host_port}"))
.spawn(move || {
accept_loop(listener, vsock_path, stop_thread);
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(TcpForwarder {
stop,
handle: Some(handle),
bound,
})
}
pub fn stop(mut self) -> Result<(), Error> {
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown().map_err(Error::from)?;
}
self.cleanup_socket();
Ok(())
}
pub fn snapshot(mut self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let meta = self.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"Vm::snapshot requires an in-process Vm (use image.start, not image.acquire)"
.to_owned(),
)
})?;
let pool = self.pool.as_ref().ok_or_else(|| {
Error::vm_msg("Vm::snapshot: no pool to drive the capture".to_owned())
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let _result = pool
.snapshot_timeout(
snap_path.to_string_lossy().to_string(),
Duration::from_secs(60),
)
.map_err(|e| Error::Vm {
msg: format!("snapshot capture failed: {e:?}"),
source: None,
})?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
self.skip_cleanup = true;
Image::from_snapshot(&dest_dir)
}
fn cleanup_socket(&self) {
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
if let Some(dir) = &self.own_vsock_mux_dir {
let _ = std::fs::remove_dir(dir);
}
}
}
const B64_ALPHA: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
pub(crate) fn b64_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
let mut i = 0;
while i + 3 <= bytes.len() {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let b2 = bytes[i + 2] as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push(B64_ALPHA[(n & 0x3f) as usize] as char);
i += 3;
}
let rem = bytes.len() - i;
if rem == 1 {
let b0 = bytes[i] as u32;
let n = b0 << 16;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push('=');
out.push('=');
} else if rem == 2 {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let n = (b0 << 16) | (b1 << 8);
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push('=');
}
out
}
pub(crate) fn b64_decode(s: &str) -> Result<Vec<u8>, String> {
let mut tbl = [255u8; 256];
for (i, &b) in B64_ALPHA.iter().enumerate() {
tbl[b as usize] = i as u8;
}
let bytes: Vec<u8> = s.bytes().filter(|b| !b.is_ascii_whitespace()).collect();
if bytes.len() % 4 != 0 {
return Err(format!("base64 length {} is not a multiple of 4", bytes.len()));
}
let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
for chunk in bytes.chunks_exact(4) {
let v: [u8; 4] = chunk.try_into().unwrap();
let pad = v.iter().filter(|&&b| b == b'=').count();
let mut acc: u32 = 0;
for &b in &v {
let d = if b == b'=' { 0 } else { tbl[b as usize] };
if b != b'=' && d == 255 {
return Err(format!("invalid base64 character {:#x}", b));
}
acc = (acc << 6) | (d as u32);
}
out.push(((acc >> 16) & 0xff) as u8);
if pad < 2 {
out.push(((acc >> 8) & 0xff) as u8);
}
if pad < 1 {
out.push((acc & 0xff) as u8);
}
}
Ok(out)
}
impl Drop for Vm {
fn drop(&mut self) {
if self.skip_cleanup {
return;
}
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
}
}
pub struct TcpForwarder {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
bound: SocketAddr,
}
impl TcpForwarder {
pub fn local_addr(&self) -> SocketAddr {
self.bound
}
pub fn stop(mut self) {
self.shutdown();
}
fn shutdown(&mut self) {
self.stop.store(true, Ordering::SeqCst);
let _ = TcpStream::connect_timeout(&self.bound, Duration::from_millis(200));
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for TcpForwarder {
fn drop(&mut self) {
self.shutdown();
}
}
fn accept_loop(listener: TcpListener, vsock_path: PathBuf, stop: Arc<AtomicBool>) {
for incoming in listener.incoming() {
if stop.load(Ordering::SeqCst) {
break;
}
let tcp = match incoming {
Ok(s) => s,
Err(_) => continue,
};
let vsock = vsock_path.clone();
std::thread::Builder::new()
.name("supermachine-tcp-conn".into())
.spawn(move || {
if let Err(e) = splice_tcp_to_unix(tcp, &vsock) {
eprintln!("supermachine: tcp forward: {e}");
}
})
.ok();
}
}
fn splice_tcp_to_unix(tcp: TcpStream, vsock_path: &Path) -> std::io::Result<()> {
let unix = UnixStream::connect(vsock_path)?;
let tcp_w = tcp.try_clone()?;
let unix_w = unix.try_clone()?;
let t1 = std::thread::Builder::new()
.name("supermachine-tcp-c2g".into())
.spawn(move || {
let _ = pump(tcp, unix_w);
})?;
let t2 = std::thread::Builder::new()
.name("supermachine-tcp-g2c".into())
.spawn(move || {
let _ = pump(unix, tcp_w);
})?;
let _ = t1.join();
let _ = t2.join();
Ok(())
}
fn pump<R, W>(mut r: R, mut w: W) -> std::io::Result<()>
where
R: Read,
W: Write + Shutdownable,
{
let mut buf = [0u8; 16 * 1024];
loop {
let n = match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
if let Err(e) = w.write_all(&buf[..n]) {
return Err(e);
}
}
let _ = w.shutdown_write();
Ok(())
}
trait Shutdownable {
fn shutdown_write(&mut self) -> std::io::Result<()>;
}
impl Shutdownable for TcpStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
TcpStream::shutdown(self, std::net::Shutdown::Write)
}
}
impl Shutdownable for UnixStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
UnixStream::shutdown(self, std::net::Shutdown::Write)
}
}
fn unique_suffix() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
nanos.wrapping_add(COUNTER.fetch_add(1, Ordering::Relaxed))
}
fn chrono_rfc3339_now() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let days = secs.div_euclid(86_400);
let sod = secs.rem_euclid(86_400);
let hh = sod / 3600;
let mm = (sod % 3600) / 60;
let ss = sod % 60;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
}